Job Runner
Package: forge.lthn.ai/core/go-scm/jobrunner
Sub-packages: jobrunner/forgejo, jobrunner/handlers
Signal polling, handler dispatch, and JSONL audit journalling for automated pipeline operations. The job runner is the core automation loop that discovers work from Forgejo epic issues and dispatches it to handlers.
Architecture
The job runner follows a poll-dispatch pattern:
- Sources (
JobSource) poll external systems for actionable work, producingPipelineSignalvalues - The Poller iterates signals and finds the first matching Handler (
JobHandler) - Handlers execute actions and produce
ActionResultvalues - Results are recorded in the Journal (JSONL audit log) and reported back to the source
Sources ──poll──> Signals ──match──> Handlers ──execute──> Results ──journal──> JSONL
──report──> Source
Core Types
PipelineSignal
The structural snapshot of a child issue or PR, carrying all state needed for handler matching and execution:
type PipelineSignal struct {
EpicNumber int // Parent epic issue number
ChildNumber int // Child issue number
PRNumber int // Linked pull request number
RepoOwner string // Repository owner
RepoName string // Repository name
PRState string // "OPEN", "MERGED", "CLOSED"
IsDraft bool // Whether the PR is a draft
Mergeable string // "MERGEABLE", "CONFLICTING", "UNKNOWN"
CheckStatus string // "SUCCESS", "FAILURE", "PENDING"
ThreadsTotal int // Total review threads
ThreadsResolved int // Resolved review threads
LastCommitSHA string // Head commit SHA
LastCommitAt time.Time // Head commit timestamp
LastReviewAt time.Time // Last review timestamp
NeedsCoding bool // true if child has no PR (work not started)
Assignee string // Issue assignee username
IssueTitle string // Child issue title
IssueBody string // Child issue body
Type string // Signal type (e.g., "agent_completion")
Success bool // Agent completion success flag
Error string // Agent error message
Message string // Agent completion message
}
// Helper methods
func (s *PipelineSignal) RepoFullName() string // "owner/repo"
func (s *PipelineSignal) HasUnresolvedThreads() bool // ThreadsTotal > ThreadsResolved
ActionResult
type ActionResult struct {
Action string // Handler name
RepoOwner string
RepoName string
EpicNumber int
ChildNumber int
PRNumber int
Success bool
Error string
Timestamp time.Time
Duration time.Duration
Cycle int // Poll cycle number
}
Interfaces
// JobSource discovers actionable work from an external system.
type JobSource interface {
Name() string
Poll(ctx context.Context) ([]*PipelineSignal, error)
Report(ctx context.Context, result *ActionResult) error
}
// JobHandler processes a single pipeline signal.
type JobHandler interface {
Name() string
Match(signal *PipelineSignal) bool
Execute(ctx context.Context, signal *PipelineSignal) (*ActionResult, error)
}
Poller
The Poller is the main poll-dispatch loop:
poller := jobrunner.NewPoller(jobrunner.PollerConfig{
Sources: []jobrunner.JobSource{source},
Handlers: []jobrunner.JobHandler{handler1, handler2},
Journal: journal,
PollInterval: 60 * time.Second, // Default: 60s
DryRun: false,
})
// Blocking loop -- runs one cycle immediately, then on each tick
err := poller.Run(ctx)
// Or run a single cycle
err := poller.RunOnce(ctx)
Dynamic Registration
Sources and handlers can be added at runtime:
poller.AddSource(newSource)
poller.AddHandler(newHandler)
Controls
poller.SetDryRun(true) // Toggle dry-run mode
poller.DryRun() // Check current mode
poller.Cycle() // Get completed cycle count
Signal Matching
The poller uses first-match semantics: for each signal, handlers are checked in registration order, and the first handler whose Match() returns true is executed. This means handler ordering matters.
Forgejo Source
Package: forge.lthn.ai/core/go-scm/jobrunner/forgejo
The built-in source that polls Forgejo for epic issues and discovers actionable child tasks.
How It Works
- Lists all open issues with the
epiclabel across configured repositories - Parses epic bodies for checklist items:
- [ ] #42(unchecked) and- [x] #42(checked) - For each unchecked child:
- Looks for a linked PR (any PR whose body references
#<childNumber>) - If a PR exists: builds a signal with PR state, mergeability, and check status
- If no PR exists but the child is assigned: builds a
NeedsCodingsignal for dispatch
- Looks for a linked PR (any PR whose body references
import (
forgejoSource "forge.lthn.ai/core/go-scm/jobrunner/forgejo"
"forge.lthn.ai/core/go-scm/forge"
)
client, _ := forge.NewFromConfig("", "")
source := forgejoSource.New(forgejoSource.Config{
Repos: []string{"core/go-scm", "core/go"},
}, client)
The source also implements Report(), which posts action results as comments on the epic issue.
Built-in Handlers
Package: forge.lthn.ai/core/go-scm/jobrunner/handlers
DispatchHandler
Dispatches coding work to remote agent machines via SSH. See AgentCI-and-Clotho for the full orchestration protocol.
| Match Condition | Action |
|---|---|
NeedsCoding == true and assignee is a known agent |
Creates a JSON ticket and transfers it to the agent's queue via SSH |
Workflow:
- Resolves assignee to agent config via
agentci.Spinner.FindByForgejoUser - Checks for duplicate dispatch (skips if
in-progressoragent-completedlabel present) - Adds
in-progresslabel, removesagent-readylabel - Determines Clotho run mode (standard or dual)
- Builds
DispatchTicketJSON with repo, issue, model, and runner details - Transfers ticket via SSH with
cat > path(stdin piping, no command injection) - Transfers Forge token separately as a
.envfile with0600permissions - On failure: adds
agent-failedlabel, removesin-progress, posts error comment
handler := handlers.NewDispatchHandler(forgeClient, forgeURL, token, spinner)
CompletionHandler
Manages issue state when an agent finishes work.
| Match Condition | Action |
|---|---|
Type == "agent_completion" |
Removes in-progress label; adds agent-completed (success) or agent-failed (failure); posts comment |
handler := handlers.NewCompletionHandler(forgeClient)
EnableAutoMergeHandler
Merges PRs that are ready using squash strategy.
| Match Condition | Action |
|---|---|
| PR is open, not draft, mergeable, checks passing, no unresolved threads | Squash-merges the PR |
handler := handlers.NewEnableAutoMergeHandler(forgeClient)
PublishDraftHandler
Marks draft PRs as ready for review once checks pass.
| Match Condition | Action |
|---|---|
PR is draft, open, and checks are SUCCESS |
Clears draft status via raw HTTP PATCH |
handler := handlers.NewPublishDraftHandler(forgeClient)
DismissReviewsHandler
Dismisses stale "request changes" reviews on a PR.
| Match Condition | Action |
|---|---|
| PR is open and has unresolved threads | Dismisses stale REQUEST_CHANGES reviews |
handler := handlers.NewDismissReviewsHandler(forgeClient)
SendFixCommandHandler
Posts comments asking for fixes on problematic PRs.
| Match Condition | Action |
|---|---|
| PR is open and has merge conflicts | Posts "Can you fix the merge conflict?" |
| PR is open, has unresolved threads, and checks are failing | Posts "Can you fix the code reviews?" |
handler := handlers.NewSendFixCommandHandler(forgeClient)
TickParentHandler
Ticks a child checkbox in the parent epic after the child's PR is merged.
| Match Condition | Action |
|---|---|
PRState == "MERGED" |
Replaces - [ ] #N with - [x] #N in epic body; closes the child issue |
handler := handlers.NewTickParentHandler(forgeClient)
Journal
The Journal writes audit entries to date-partitioned JSONL files with path-traversal protection:
journal, err := jobrunner.NewJournal("/var/log/agentci")
// Files are written as: /var/log/agentci/{owner}/{repo}/{date}.jsonl
// Example: /var/log/agentci/core/go-scm/2026-02-19.jsonl
Entry Structure
type JournalEntry struct {
Timestamp string // UTC ISO 8601
Epic int
Child int
PR int
Repo string // "owner/repo"
Action string // Handler name
Signals SignalSnapshot // PR state at time of action
Result ResultSnapshot // Action outcome
Cycle int // Poll cycle number
}
Security
The journal validates all path components to prevent path traversal:
- Owner and repo names must match
^[a-zA-Z0-9][a-zA-Z0-9._-]*$ - Resolved paths are verified to stay within the base directory
- Path separators,
.., and empty strings are rejected
Recommended Handler Order
Since the poller uses first-match semantics, handler registration order determines priority. The recommended order:
DispatchHandler-- Dispatch new work to agentsCompletionHandler-- Handle agent completion eventsTickParentHandler-- Tick merged children in epicPublishDraftHandler-- Publish draft PRs with passing checksDismissReviewsHandler-- Dismiss stale reviewsEnableAutoMergeHandler-- Merge ready PRsSendFixCommandHandler-- Request fixes (lowest priority)
Full Example
import (
"forge.lthn.ai/core/go-scm/agentci"
"forge.lthn.ai/core/go-scm/forge"
"forge.lthn.ai/core/go-scm/jobrunner"
forgejoSource "forge.lthn.ai/core/go-scm/jobrunner/forgejo"
"forge.lthn.ai/core/go-scm/jobrunner/handlers"
)
// Set up clients
forgeClient, _ := forge.NewFromConfig("", "")
url, token, _ := forge.ResolveConfig("", "")
// Set up AgentCI
cfg, _ := config.New()
agents, _ := agentci.LoadActiveAgents(cfg)
clothoCfg, _ := agentci.LoadClothoConfig(cfg)
spinner := agentci.NewSpinner(clothoCfg, agents)
// Set up journal
journal, _ := jobrunner.NewJournal("/var/log/agentci")
// Set up source
source := forgejoSource.New(forgejoSource.Config{
Repos: []string{"core/go-scm", "core/go"},
}, forgeClient)
// Build and run poller
poller := jobrunner.NewPoller(jobrunner.PollerConfig{
Sources: []jobrunner.JobSource{source},
Handlers: []jobrunner.JobHandler{
handlers.NewDispatchHandler(forgeClient, url, token, spinner),
handlers.NewCompletionHandler(forgeClient),
handlers.NewTickParentHandler(forgeClient),
handlers.NewPublishDraftHandler(forgeClient),
handlers.NewDismissReviewsHandler(forgeClient),
handlers.NewEnableAutoMergeHandler(forgeClient),
handlers.NewSendFixCommandHandler(forgeClient),
},
Journal: journal,
PollInterval: 60 * time.Second,
})
poller.Run(ctx)
See Also
- AgentCI-and-Clotho -- Agent orchestration used by DispatchHandler
- Forge-Client -- Forgejo API client used by all handlers
- Data-Collection -- Data collection subsystem (separate from job runner)
- Home -- Package overview