1 Job-Runner
Virgil edited this page 2026-02-19 17:03:31 +00:00

Job Runner

Package: forge.lthn.ai/core/go-scm/jobrunner
Sub-packages: jobrunner/forgejo, jobrunner/handlers

Signal polling, handler dispatch, and JSONL audit journalling for automated pipeline operations. The job runner is the core automation loop that discovers work from Forgejo epic issues and dispatches it to handlers.

Architecture

The job runner follows a poll-dispatch pattern:

  1. Sources (JobSource) poll external systems for actionable work, producing PipelineSignal values
  2. The Poller iterates signals and finds the first matching Handler (JobHandler)
  3. Handlers execute actions and produce ActionResult values
  4. Results are recorded in the Journal (JSONL audit log) and reported back to the source
Sources ──poll──> Signals ──match──> Handlers ──execute──> Results ──journal──> JSONL
                                                                  ──report──> Source

Core Types

PipelineSignal

The structural snapshot of a child issue or PR, carrying all state needed for handler matching and execution:

type PipelineSignal struct {
    EpicNumber      int       // Parent epic issue number
    ChildNumber     int       // Child issue number
    PRNumber        int       // Linked pull request number
    RepoOwner       string    // Repository owner
    RepoName        string    // Repository name
    PRState         string    // "OPEN", "MERGED", "CLOSED"
    IsDraft         bool      // Whether the PR is a draft
    Mergeable       string    // "MERGEABLE", "CONFLICTING", "UNKNOWN"
    CheckStatus     string    // "SUCCESS", "FAILURE", "PENDING"
    ThreadsTotal    int       // Total review threads
    ThreadsResolved int       // Resolved review threads
    LastCommitSHA   string    // Head commit SHA
    LastCommitAt    time.Time // Head commit timestamp
    LastReviewAt    time.Time // Last review timestamp
    NeedsCoding     bool      // true if child has no PR (work not started)
    Assignee        string    // Issue assignee username
    IssueTitle      string    // Child issue title
    IssueBody       string    // Child issue body
    Type            string    // Signal type (e.g., "agent_completion")
    Success         bool      // Agent completion success flag
    Error           string    // Agent error message
    Message         string    // Agent completion message
}

// Helper methods
func (s *PipelineSignal) RepoFullName() string       // "owner/repo"
func (s *PipelineSignal) HasUnresolvedThreads() bool  // ThreadsTotal > ThreadsResolved

ActionResult

type ActionResult struct {
    Action      string        // Handler name
    RepoOwner   string
    RepoName    string
    EpicNumber  int
    ChildNumber int
    PRNumber    int
    Success     bool
    Error       string
    Timestamp   time.Time
    Duration    time.Duration
    Cycle       int           // Poll cycle number
}

Interfaces

// JobSource discovers actionable work from an external system.
type JobSource interface {
    Name() string
    Poll(ctx context.Context) ([]*PipelineSignal, error)
    Report(ctx context.Context, result *ActionResult) error
}

// JobHandler processes a single pipeline signal.
type JobHandler interface {
    Name() string
    Match(signal *PipelineSignal) bool
    Execute(ctx context.Context, signal *PipelineSignal) (*ActionResult, error)
}

Poller

The Poller is the main poll-dispatch loop:

poller := jobrunner.NewPoller(jobrunner.PollerConfig{
    Sources:      []jobrunner.JobSource{source},
    Handlers:     []jobrunner.JobHandler{handler1, handler2},
    Journal:      journal,
    PollInterval: 60 * time.Second, // Default: 60s
    DryRun:       false,
})

// Blocking loop -- runs one cycle immediately, then on each tick
err := poller.Run(ctx)

// Or run a single cycle
err := poller.RunOnce(ctx)

Dynamic Registration

Sources and handlers can be added at runtime:

poller.AddSource(newSource)
poller.AddHandler(newHandler)

Controls

poller.SetDryRun(true)   // Toggle dry-run mode
poller.DryRun()          // Check current mode
poller.Cycle()           // Get completed cycle count

Signal Matching

The poller uses first-match semantics: for each signal, handlers are checked in registration order, and the first handler whose Match() returns true is executed. This means handler ordering matters.

Forgejo Source

Package: forge.lthn.ai/core/go-scm/jobrunner/forgejo

The built-in source that polls Forgejo for epic issues and discovers actionable child tasks.

How It Works

  1. Lists all open issues with the epic label across configured repositories
  2. Parses epic bodies for checklist items: - [ ] #42 (unchecked) and - [x] #42 (checked)
  3. For each unchecked child:
    • Looks for a linked PR (any PR whose body references #<childNumber>)
    • If a PR exists: builds a signal with PR state, mergeability, and check status
    • If no PR exists but the child is assigned: builds a NeedsCoding signal for dispatch
import (
    forgejoSource "forge.lthn.ai/core/go-scm/jobrunner/forgejo"
    "forge.lthn.ai/core/go-scm/forge"
)

client, _ := forge.NewFromConfig("", "")

source := forgejoSource.New(forgejoSource.Config{
    Repos: []string{"core/go-scm", "core/go"},
}, client)

The source also implements Report(), which posts action results as comments on the epic issue.

Built-in Handlers

Package: forge.lthn.ai/core/go-scm/jobrunner/handlers

DispatchHandler

Dispatches coding work to remote agent machines via SSH. See AgentCI-and-Clotho for the full orchestration protocol.

Match Condition Action
NeedsCoding == true and assignee is a known agent Creates a JSON ticket and transfers it to the agent's queue via SSH

Workflow:

  1. Resolves assignee to agent config via agentci.Spinner.FindByForgejoUser
  2. Checks for duplicate dispatch (skips if in-progress or agent-completed label present)
  3. Adds in-progress label, removes agent-ready label
  4. Determines Clotho run mode (standard or dual)
  5. Builds DispatchTicket JSON with repo, issue, model, and runner details
  6. Transfers ticket via SSH with cat > path (stdin piping, no command injection)
  7. Transfers Forge token separately as a .env file with 0600 permissions
  8. On failure: adds agent-failed label, removes in-progress, posts error comment
handler := handlers.NewDispatchHandler(forgeClient, forgeURL, token, spinner)

CompletionHandler

Manages issue state when an agent finishes work.

Match Condition Action
Type == "agent_completion" Removes in-progress label; adds agent-completed (success) or agent-failed (failure); posts comment
handler := handlers.NewCompletionHandler(forgeClient)

EnableAutoMergeHandler

Merges PRs that are ready using squash strategy.

Match Condition Action
PR is open, not draft, mergeable, checks passing, no unresolved threads Squash-merges the PR
handler := handlers.NewEnableAutoMergeHandler(forgeClient)

PublishDraftHandler

Marks draft PRs as ready for review once checks pass.

Match Condition Action
PR is draft, open, and checks are SUCCESS Clears draft status via raw HTTP PATCH
handler := handlers.NewPublishDraftHandler(forgeClient)

DismissReviewsHandler

Dismisses stale "request changes" reviews on a PR.

Match Condition Action
PR is open and has unresolved threads Dismisses stale REQUEST_CHANGES reviews
handler := handlers.NewDismissReviewsHandler(forgeClient)

SendFixCommandHandler

Posts comments asking for fixes on problematic PRs.

Match Condition Action
PR is open and has merge conflicts Posts "Can you fix the merge conflict?"
PR is open, has unresolved threads, and checks are failing Posts "Can you fix the code reviews?"
handler := handlers.NewSendFixCommandHandler(forgeClient)

TickParentHandler

Ticks a child checkbox in the parent epic after the child's PR is merged.

Match Condition Action
PRState == "MERGED" Replaces - [ ] #N with - [x] #N in epic body; closes the child issue
handler := handlers.NewTickParentHandler(forgeClient)

Journal

The Journal writes audit entries to date-partitioned JSONL files with path-traversal protection:

journal, err := jobrunner.NewJournal("/var/log/agentci")

// Files are written as: /var/log/agentci/{owner}/{repo}/{date}.jsonl
// Example: /var/log/agentci/core/go-scm/2026-02-19.jsonl

Entry Structure

type JournalEntry struct {
    Timestamp string         // UTC ISO 8601
    Epic      int
    Child     int
    PR        int
    Repo      string         // "owner/repo"
    Action    string         // Handler name
    Signals   SignalSnapshot // PR state at time of action
    Result    ResultSnapshot // Action outcome
    Cycle     int            // Poll cycle number
}

Security

The journal validates all path components to prevent path traversal:

  • Owner and repo names must match ^[a-zA-Z0-9][a-zA-Z0-9._-]*$
  • Resolved paths are verified to stay within the base directory
  • Path separators, .., and empty strings are rejected

Since the poller uses first-match semantics, handler registration order determines priority. The recommended order:

  1. DispatchHandler -- Dispatch new work to agents
  2. CompletionHandler -- Handle agent completion events
  3. TickParentHandler -- Tick merged children in epic
  4. PublishDraftHandler -- Publish draft PRs with passing checks
  5. DismissReviewsHandler -- Dismiss stale reviews
  6. EnableAutoMergeHandler -- Merge ready PRs
  7. SendFixCommandHandler -- Request fixes (lowest priority)

Full Example

import (
    "forge.lthn.ai/core/go-scm/agentci"
    "forge.lthn.ai/core/go-scm/forge"
    "forge.lthn.ai/core/go-scm/jobrunner"
    forgejoSource "forge.lthn.ai/core/go-scm/jobrunner/forgejo"
    "forge.lthn.ai/core/go-scm/jobrunner/handlers"
)

// Set up clients
forgeClient, _ := forge.NewFromConfig("", "")
url, token, _ := forge.ResolveConfig("", "")

// Set up AgentCI
cfg, _ := config.New()
agents, _ := agentci.LoadActiveAgents(cfg)
clothoCfg, _ := agentci.LoadClothoConfig(cfg)
spinner := agentci.NewSpinner(clothoCfg, agents)

// Set up journal
journal, _ := jobrunner.NewJournal("/var/log/agentci")

// Set up source
source := forgejoSource.New(forgejoSource.Config{
    Repos: []string{"core/go-scm", "core/go"},
}, forgeClient)

// Build and run poller
poller := jobrunner.NewPoller(jobrunner.PollerConfig{
    Sources:  []jobrunner.JobSource{source},
    Handlers: []jobrunner.JobHandler{
        handlers.NewDispatchHandler(forgeClient, url, token, spinner),
        handlers.NewCompletionHandler(forgeClient),
        handlers.NewTickParentHandler(forgeClient),
        handlers.NewPublishDraftHandler(forgeClient),
        handlers.NewDismissReviewsHandler(forgeClient),
        handlers.NewEnableAutoMergeHandler(forgeClient),
        handlers.NewSendFixCommandHandler(forgeClient),
    },
    Journal:      journal,
    PollInterval: 60 * time.Second,
})

poller.Run(ctx)

See Also