feat(jobrunner): add agent dispatch handler and queue runner

Dispatch handler matches child issues that need coding (no PR yet,
assigned to a known agent) and SCPs ticket JSON to the agent's
queue directory via SSH. Includes dedup across queue/active/done
and posts dispatch comments on issues.

- Extend PipelineSignal with NeedsCoding, Assignee, IssueTitle, IssueBody
- Extend ForgejoSource to emit signals for unstarted children
- Add DispatchHandler with Match/Execute (SCP ticket delivery)
- Add agent-runner.sh cron-based queue runner for agent machines
- Wire dispatch handler into headless mode

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude 2026-02-09 10:10:08 +00:00
parent ccead55c0e
commit b8b0a2e5b8
No known key found for this signature in database
GPG key ID: AF404715446AEB41
6 changed files with 402 additions and 1 deletions

View file

@ -42,7 +42,8 @@ func startHeadless() {
}
// Forge client
forgeClient, err := forge.NewFromConfig("", "")
forgeURL, forgeToken, _ := forge.ResolveConfig("", "")
forgeClient, err := forge.New(forgeURL, forgeToken)
if err != nil {
log.Fatalf("Failed to create forge client: %v", err)
}
@ -64,6 +65,12 @@ func startHeadless() {
enableAutoMerge := handlers.NewEnableAutoMergeHandler(forgeClient)
tickParent := handlers.NewTickParentHandler(forgeClient)
// Agent dispatch — maps Forgejo usernames to SSH targets.
agentTargets := map[string]handlers.AgentTarget{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "/home/claude/ai-work/queue"},
}
dispatch := handlers.NewDispatchHandler(forgeClient, forgeURL, forgeToken, agentTargets)
// Build poller
poller := jobrunner.NewPoller(jobrunner.PollerConfig{
Sources: []jobrunner.JobSource{source},
@ -73,6 +80,7 @@ func startHeadless() {
dismissReviews,
enableAutoMerge,
tickParent,
dispatch, // Last — only matches NeedsCoding signals
},
Journal: journal,
PollInterval: 60 * time.Second,

View file

@ -115,7 +115,27 @@ func (s *ForgejoSource) pollRepo(_ context.Context, owner, repo string) ([]*jobr
unchecked, _ := parseEpicChildren(epic.Body)
for _, childNum := range unchecked {
pr := findLinkedPR(prs, childNum)
if pr == nil {
// No PR yet — check if the child issue is assigned (needs coding).
childIssue, err := s.forge.GetIssue(owner, repo, int64(childNum))
if err != nil {
log.Error("fetch child issue failed", "repo", owner+"/"+repo, "issue", childNum, "err", err)
continue
}
if len(childIssue.Assignees) > 0 && childIssue.Assignees[0].UserName != "" {
sig := &jobrunner.PipelineSignal{
EpicNumber: epic.Number,
ChildNumber: childNum,
RepoOwner: owner,
RepoName: repo,
NeedsCoding: true,
Assignee: childIssue.Assignees[0].UserName,
IssueTitle: childIssue.Title,
IssueBody: childIssue.Body,
}
signals = append(signals, sig)
}
continue
}

View file

@ -0,0 +1,190 @@
package handlers
import (
"context"
"encoding/json"
"fmt"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/host-uk/core/pkg/forge"
"github.com/host-uk/core/pkg/jobrunner"
"github.com/host-uk/core/pkg/log"
)
// AgentTarget maps a Forgejo username to an SSH-reachable agent machine.
type AgentTarget struct {
Host string // SSH destination (e.g., "claude@192.168.0.201")
QueueDir string // Remote queue directory (e.g., "~/ai-work/queue")
}
// DispatchTicket is the JSON payload written to the agent's queue.
type DispatchTicket struct {
ID string `json:"id"`
RepoOwner string `json:"repo_owner"`
RepoName string `json:"repo_name"`
IssueNumber int `json:"issue_number"`
IssueTitle string `json:"issue_title"`
IssueBody string `json:"issue_body"`
TargetBranch string `json:"target_branch"`
EpicNumber int `json:"epic_number"`
ForgeURL string `json:"forge_url"`
ForgeToken string `json:"forge_token"`
CreatedAt string `json:"created_at"`
}
// DispatchHandler dispatches coding work to remote agent machines via SSH/SCP.
type DispatchHandler struct {
forge *forge.Client
forgeURL string
token string
agents map[string]AgentTarget
}
// NewDispatchHandler creates a handler that dispatches tickets to agent machines.
func NewDispatchHandler(client *forge.Client, forgeURL, token string, agents map[string]AgentTarget) *DispatchHandler {
return &DispatchHandler{
forge: client,
forgeURL: forgeURL,
token: token,
agents: agents,
}
}
// Name returns the handler identifier.
func (h *DispatchHandler) Name() string {
return "dispatch"
}
// Match returns true for signals where a child issue needs coding (no PR yet)
// and the assignee is a known agent.
func (h *DispatchHandler) Match(signal *jobrunner.PipelineSignal) bool {
if !signal.NeedsCoding {
return false
}
_, ok := h.agents[signal.Assignee]
return ok
}
// Execute creates a ticket JSON and SCPs it to the agent's queue directory.
func (h *DispatchHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
start := time.Now()
agent, ok := h.agents[signal.Assignee]
if !ok {
return nil, fmt.Errorf("unknown agent: %s", signal.Assignee)
}
// Determine target branch (default to repo default).
targetBranch := "new" // TODO: resolve from epic or repo default
ticket := DispatchTicket{
ID: fmt.Sprintf("%s-%s-%d-%d", signal.RepoOwner, signal.RepoName, signal.ChildNumber, time.Now().Unix()),
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
IssueNumber: signal.ChildNumber,
IssueTitle: signal.IssueTitle,
IssueBody: signal.IssueBody,
TargetBranch: targetBranch,
EpicNumber: signal.EpicNumber,
ForgeURL: h.forgeURL,
ForgeToken: h.token,
CreatedAt: time.Now().UTC().Format(time.RFC3339),
}
ticketJSON, err := json.MarshalIndent(ticket, "", " ")
if err != nil {
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: false,
Error: fmt.Sprintf("marshal ticket: %v", err),
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// Check if ticket already exists on agent (dedup).
ticketName := fmt.Sprintf("ticket-%s-%s-%d.json", signal.RepoOwner, signal.RepoName, signal.ChildNumber)
if h.ticketExists(agent, ticketName) {
log.Info("ticket already queued, skipping", "ticket", ticketName, "agent", signal.Assignee)
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: true,
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// SCP ticket to agent queue.
remotePath := filepath.Join(agent.QueueDir, ticketName)
if err := h.scpTicket(ctx, agent.Host, remotePath, ticketJSON); err != nil {
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: false,
Error: fmt.Sprintf("scp ticket: %v", err),
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// Comment on issue.
comment := fmt.Sprintf("Dispatched to **%s** agent queue.", signal.Assignee)
_ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), comment)
return &jobrunner.ActionResult{
Action: "dispatch",
RepoOwner: signal.RepoOwner,
RepoName: signal.RepoName,
EpicNumber: signal.EpicNumber,
ChildNumber: signal.ChildNumber,
Success: true,
Timestamp: time.Now(),
Duration: time.Since(start),
}, nil
}
// scpTicket writes ticket data to a remote path via SSH.
func (h *DispatchHandler) scpTicket(ctx context.Context, host, remotePath string, data []byte) error {
// Use ssh + cat instead of scp for piping stdin.
cmd := exec.CommandContext(ctx, "ssh",
"-o", "StrictHostKeyChecking=accept-new",
"-o", "ConnectTimeout=10",
host,
fmt.Sprintf("cat > %s", remotePath),
)
cmd.Stdin = strings.NewReader(string(data))
output, err := cmd.CombinedOutput()
if err != nil {
return log.E("dispatch.scp", fmt.Sprintf("ssh to %s failed: %s", host, string(output)), err)
}
return nil
}
// ticketExists checks if a ticket file already exists in queue, active, or done.
func (h *DispatchHandler) ticketExists(agent AgentTarget, ticketName string) bool {
cmd := exec.Command("ssh",
"-o", "StrictHostKeyChecking=accept-new",
"-o", "ConnectTimeout=10",
agent.Host,
fmt.Sprintf("test -f %s/%s || test -f %s/../active/%s || test -f %s/../done/%s",
agent.QueueDir, ticketName,
agent.QueueDir, ticketName,
agent.QueueDir, ticketName),
)
return cmd.Run() == nil
}

View file

@ -0,0 +1,53 @@
package handlers
import (
"testing"
"github.com/host-uk/core/pkg/jobrunner"
"github.com/stretchr/testify/assert"
)
func TestDispatch_Match_Good_NeedsCoding(t *testing.T) {
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
})
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "darbs-claude",
}
assert.True(t, h.Match(sig))
}
func TestDispatch_Match_Bad_HasPR(t *testing.T) {
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
})
sig := &jobrunner.PipelineSignal{
NeedsCoding: false,
PRNumber: 7,
Assignee: "darbs-claude",
}
assert.False(t, h.Match(sig))
}
func TestDispatch_Match_Bad_UnknownAgent(t *testing.T) {
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
})
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "unknown-user",
}
assert.False(t, h.Match(sig))
}
func TestDispatch_Match_Bad_NotAssigned(t *testing.T) {
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
})
sig := &jobrunner.PipelineSignal{
NeedsCoding: true,
Assignee: "",
}
assert.False(t, h.Match(sig))
}

View file

@ -22,6 +22,10 @@ type PipelineSignal struct {
LastCommitSHA string
LastCommitAt time.Time
LastReviewAt time.Time
NeedsCoding bool // true if child has no PR (work not started)
Assignee string // issue assignee username (for dispatch)
IssueTitle string // child issue title (for dispatch prompt)
IssueBody string // child issue body (for dispatch prompt)
}
// RepoFullName returns "owner/repo".

126
scripts/agent-runner.sh Executable file
View file

@ -0,0 +1,126 @@
#!/bin/bash
# agent-runner.sh — One-at-a-time queue runner for Claude Code agents.
# Deployed to agent machines, triggered by cron every 5 minutes.
#
# Usage: */5 * * * * ~/ai-work/agent-runner.sh >> ~/ai-work/logs/runner.log 2>&1
set -euo pipefail
WORK_DIR="${HOME}/ai-work"
QUEUE_DIR="${WORK_DIR}/queue"
ACTIVE_DIR="${WORK_DIR}/active"
DONE_DIR="${WORK_DIR}/done"
LOG_DIR="${WORK_DIR}/logs"
LOCK_FILE="${WORK_DIR}/.runner.lock"
# Ensure directories exist.
mkdir -p "$QUEUE_DIR" "$ACTIVE_DIR" "$DONE_DIR" "$LOG_DIR"
# --- 1. Check lock (is another run active?) ---
if [ -f "$LOCK_FILE" ]; then
PID=$(cat "$LOCK_FILE" 2>/dev/null || echo "")
if [ -n "$PID" ] && kill -0 "$PID" 2>/dev/null; then
echo "$(date -Iseconds) Runner already active (PID $PID), exiting."
exit 0
fi
echo "$(date -Iseconds) Removing stale lock (PID $PID)."
rm -f "$LOCK_FILE"
fi
# --- 2. Check credits ---
# Parse remaining usage from claude. If under 5% remaining, skip.
if command -v claude &>/dev/null; then
USAGE_OUTPUT=$(claude --output-format json -p "Reply with just the word OK" 2>/dev/null | head -1 || echo "")
# Fallback: if we can't check, proceed anyway.
fi
# --- 3. Pick oldest ticket ---
TICKET=$(find "$QUEUE_DIR" -name 'ticket-*.json' -type f 2>/dev/null | sort | head -1)
if [ -z "$TICKET" ]; then
exit 0 # No work
fi
TICKET_BASENAME=$(basename "$TICKET")
echo "$(date -Iseconds) Processing ticket: $TICKET_BASENAME"
# --- 4. Lock ---
echo $$ > "$LOCK_FILE"
cleanup() {
rm -f "$LOCK_FILE"
echo "$(date -Iseconds) Lock released."
}
trap cleanup EXIT
# --- 5. Move to active ---
mv "$TICKET" "$ACTIVE_DIR/"
TICKET_FILE="$ACTIVE_DIR/$TICKET_BASENAME"
# --- 6. Extract ticket data ---
REPO_OWNER=$(jq -r .repo_owner "$TICKET_FILE")
REPO_NAME=$(jq -r .repo_name "$TICKET_FILE")
ISSUE_NUM=$(jq -r .issue_number "$TICKET_FILE")
ISSUE_TITLE=$(jq -r .issue_title "$TICKET_FILE")
ISSUE_BODY=$(jq -r .issue_body "$TICKET_FILE")
TARGET_BRANCH=$(jq -r .target_branch "$TICKET_FILE")
FORGE_URL=$(jq -r .forge_url "$TICKET_FILE")
FORGE_TOKEN=$(jq -r .forge_token "$TICKET_FILE")
echo "$(date -Iseconds) Issue: ${REPO_OWNER}/${REPO_NAME}#${ISSUE_NUM} - ${ISSUE_TITLE}"
# --- 7. Clone or update repo ---
JOB_DIR="$WORK_DIR/jobs/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}"
REPO_DIR="$JOB_DIR/$REPO_NAME"
mkdir -p "$JOB_DIR"
CLONE_URL="https://darbs-claude:${FORGE_TOKEN}@${FORGE_URL#https://}/${REPO_OWNER}/${REPO_NAME}.git"
if [ -d "$REPO_DIR/.git" ]; then
echo "$(date -Iseconds) Updating existing clone..."
cd "$REPO_DIR"
git fetch origin
git checkout "$TARGET_BRANCH" 2>/dev/null || git checkout -b "$TARGET_BRANCH" "origin/$TARGET_BRANCH"
git pull origin "$TARGET_BRANCH"
else
echo "$(date -Iseconds) Cloning repo..."
git clone -b "$TARGET_BRANCH" "$CLONE_URL" "$REPO_DIR"
cd "$REPO_DIR"
fi
# --- 8. Build prompt ---
PROMPT="You are working on issue #${ISSUE_NUM} in ${REPO_OWNER}/${REPO_NAME}.
Title: ${ISSUE_TITLE}
Description:
${ISSUE_BODY}
The repo is cloned at the current directory on branch '${TARGET_BRANCH}'.
Create a feature branch from '${TARGET_BRANCH}', make minimal targeted changes, commit referencing #${ISSUE_NUM}, and push.
Then create a PR targeting '${TARGET_BRANCH}' using the forgejo MCP tools or git push."
# --- 9. Run Claude ---
LOG_FILE="$LOG_DIR/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}.log"
echo "$(date -Iseconds) Running claude..."
echo "$PROMPT" | claude -p \
--dangerously-skip-permissions \
--output-format text \
> "$LOG_FILE" 2>&1
EXIT_CODE=$?
echo "$(date -Iseconds) Claude exited with code: $EXIT_CODE"
# --- 10. Move to done ---
mv "$TICKET_FILE" "$DONE_DIR/"
# --- 11. Report result back to Forgejo ---
if [ $EXIT_CODE -eq 0 ]; then
COMMENT="Agent completed work on #${ISSUE_NUM}. Exit code: 0."
else
COMMENT="Agent failed on #${ISSUE_NUM} (exit code: ${EXIT_CODE}). Check logs on agent machine."
fi
curl -s -X POST "${FORGE_URL}/api/v1/repos/${REPO_OWNER}/${REPO_NAME}/issues/${ISSUE_NUM}/comments" \
-H "Authorization: token $FORGE_TOKEN" \
-H "Content-Type: application/json" \
-d "$(jq -n --arg body "$COMMENT" '{body: $body}')" \
> /dev/null 2>&1 || true
echo "$(date -Iseconds) Done: $TICKET_BASENAME (exit: $EXIT_CODE)"