From 849695fe3986ef1c978eb7ffeb3dd1c899fab79b Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 9 Feb 2026 10:10:08 +0000 Subject: [PATCH] feat(jobrunner): add agent dispatch handler and queue runner Dispatch handler matches child issues that need coding (no PR yet, assigned to a known agent) and SCPs ticket JSON to the agent's queue directory via SSH. Includes dedup across queue/active/done and posts dispatch comments on issues. - Extend PipelineSignal with NeedsCoding, Assignee, IssueTitle, IssueBody - Extend ForgejoSource to emit signals for unstarted children - Add DispatchHandler with Match/Execute (SCP ticket delivery) - Add agent-runner.sh cron-based queue runner for agent machines - Wire dispatch handler into headless mode Co-Authored-By: Claude Opus 4.6 --- internal/core-ide/headless.go | 10 +- pkg/jobrunner/forgejo/source.go | 20 +++ pkg/jobrunner/handlers/dispatch.go | 190 ++++++++++++++++++++++++ pkg/jobrunner/handlers/dispatch_test.go | 53 +++++++ pkg/jobrunner/types.go | 4 + scripts/agent-runner.sh | 126 ++++++++++++++++ 6 files changed, 402 insertions(+), 1 deletion(-) create mode 100644 pkg/jobrunner/handlers/dispatch.go create mode 100644 pkg/jobrunner/handlers/dispatch_test.go create mode 100755 scripts/agent-runner.sh diff --git a/internal/core-ide/headless.go b/internal/core-ide/headless.go index d744933a..929b6b61 100644 --- a/internal/core-ide/headless.go +++ b/internal/core-ide/headless.go @@ -42,7 +42,8 @@ func startHeadless() { } // Forge client - forgeClient, err := forge.NewFromConfig("", "") + forgeURL, forgeToken, _ := forge.ResolveConfig("", "") + forgeClient, err := forge.New(forgeURL, forgeToken) if err != nil { log.Fatalf("Failed to create forge client: %v", err) } @@ -64,6 +65,12 @@ func startHeadless() { enableAutoMerge := handlers.NewEnableAutoMergeHandler(forgeClient) tickParent := handlers.NewTickParentHandler(forgeClient) + // Agent dispatch — maps Forgejo usernames to SSH targets. + agentTargets := map[string]handlers.AgentTarget{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "/home/claude/ai-work/queue"}, + } + dispatch := handlers.NewDispatchHandler(forgeClient, forgeURL, forgeToken, agentTargets) + // Build poller poller := jobrunner.NewPoller(jobrunner.PollerConfig{ Sources: []jobrunner.JobSource{source}, @@ -73,6 +80,7 @@ func startHeadless() { dismissReviews, enableAutoMerge, tickParent, + dispatch, // Last — only matches NeedsCoding signals }, Journal: journal, PollInterval: 60 * time.Second, diff --git a/pkg/jobrunner/forgejo/source.go b/pkg/jobrunner/forgejo/source.go index 74fe6d62..0df0f13a 100644 --- a/pkg/jobrunner/forgejo/source.go +++ b/pkg/jobrunner/forgejo/source.go @@ -115,7 +115,27 @@ func (s *ForgejoSource) pollRepo(_ context.Context, owner, repo string) ([]*jobr unchecked, _ := parseEpicChildren(epic.Body) for _, childNum := range unchecked { pr := findLinkedPR(prs, childNum) + if pr == nil { + // No PR yet — check if the child issue is assigned (needs coding). + childIssue, err := s.forge.GetIssue(owner, repo, int64(childNum)) + if err != nil { + log.Error("fetch child issue failed", "repo", owner+"/"+repo, "issue", childNum, "err", err) + continue + } + if len(childIssue.Assignees) > 0 && childIssue.Assignees[0].UserName != "" { + sig := &jobrunner.PipelineSignal{ + EpicNumber: epic.Number, + ChildNumber: childNum, + RepoOwner: owner, + RepoName: repo, + NeedsCoding: true, + Assignee: childIssue.Assignees[0].UserName, + IssueTitle: childIssue.Title, + IssueBody: childIssue.Body, + } + signals = append(signals, sig) + } continue } diff --git a/pkg/jobrunner/handlers/dispatch.go b/pkg/jobrunner/handlers/dispatch.go new file mode 100644 index 00000000..c32340fa --- /dev/null +++ b/pkg/jobrunner/handlers/dispatch.go @@ -0,0 +1,190 @@ +package handlers + +import ( + "context" + "encoding/json" + "fmt" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/host-uk/core/pkg/forge" + "github.com/host-uk/core/pkg/jobrunner" + "github.com/host-uk/core/pkg/log" +) + +// AgentTarget maps a Forgejo username to an SSH-reachable agent machine. +type AgentTarget struct { + Host string // SSH destination (e.g., "claude@192.168.0.201") + QueueDir string // Remote queue directory (e.g., "~/ai-work/queue") +} + +// DispatchTicket is the JSON payload written to the agent's queue. +type DispatchTicket struct { + ID string `json:"id"` + RepoOwner string `json:"repo_owner"` + RepoName string `json:"repo_name"` + IssueNumber int `json:"issue_number"` + IssueTitle string `json:"issue_title"` + IssueBody string `json:"issue_body"` + TargetBranch string `json:"target_branch"` + EpicNumber int `json:"epic_number"` + ForgeURL string `json:"forge_url"` + ForgeToken string `json:"forge_token"` + CreatedAt string `json:"created_at"` +} + +// DispatchHandler dispatches coding work to remote agent machines via SSH/SCP. +type DispatchHandler struct { + forge *forge.Client + forgeURL string + token string + agents map[string]AgentTarget +} + +// NewDispatchHandler creates a handler that dispatches tickets to agent machines. +func NewDispatchHandler(client *forge.Client, forgeURL, token string, agents map[string]AgentTarget) *DispatchHandler { + return &DispatchHandler{ + forge: client, + forgeURL: forgeURL, + token: token, + agents: agents, + } +} + +// Name returns the handler identifier. +func (h *DispatchHandler) Name() string { + return "dispatch" +} + +// Match returns true for signals where a child issue needs coding (no PR yet) +// and the assignee is a known agent. +func (h *DispatchHandler) Match(signal *jobrunner.PipelineSignal) bool { + if !signal.NeedsCoding { + return false + } + _, ok := h.agents[signal.Assignee] + return ok +} + +// Execute creates a ticket JSON and SCPs it to the agent's queue directory. +func (h *DispatchHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) { + start := time.Now() + + agent, ok := h.agents[signal.Assignee] + if !ok { + return nil, fmt.Errorf("unknown agent: %s", signal.Assignee) + } + + // Determine target branch (default to repo default). + targetBranch := "new" // TODO: resolve from epic or repo default + + ticket := DispatchTicket{ + ID: fmt.Sprintf("%s-%s-%d-%d", signal.RepoOwner, signal.RepoName, signal.ChildNumber, time.Now().Unix()), + RepoOwner: signal.RepoOwner, + RepoName: signal.RepoName, + IssueNumber: signal.ChildNumber, + IssueTitle: signal.IssueTitle, + IssueBody: signal.IssueBody, + TargetBranch: targetBranch, + EpicNumber: signal.EpicNumber, + ForgeURL: h.forgeURL, + ForgeToken: h.token, + CreatedAt: time.Now().UTC().Format(time.RFC3339), + } + + ticketJSON, err := json.MarshalIndent(ticket, "", " ") + if err != nil { + return &jobrunner.ActionResult{ + Action: "dispatch", + RepoOwner: signal.RepoOwner, + RepoName: signal.RepoName, + EpicNumber: signal.EpicNumber, + ChildNumber: signal.ChildNumber, + Success: false, + Error: fmt.Sprintf("marshal ticket: %v", err), + Timestamp: time.Now(), + Duration: time.Since(start), + }, nil + } + + // Check if ticket already exists on agent (dedup). + ticketName := fmt.Sprintf("ticket-%s-%s-%d.json", signal.RepoOwner, signal.RepoName, signal.ChildNumber) + if h.ticketExists(agent, ticketName) { + log.Info("ticket already queued, skipping", "ticket", ticketName, "agent", signal.Assignee) + return &jobrunner.ActionResult{ + Action: "dispatch", + RepoOwner: signal.RepoOwner, + RepoName: signal.RepoName, + EpicNumber: signal.EpicNumber, + ChildNumber: signal.ChildNumber, + Success: true, + Timestamp: time.Now(), + Duration: time.Since(start), + }, nil + } + + // SCP ticket to agent queue. + remotePath := filepath.Join(agent.QueueDir, ticketName) + if err := h.scpTicket(ctx, agent.Host, remotePath, ticketJSON); err != nil { + return &jobrunner.ActionResult{ + Action: "dispatch", + RepoOwner: signal.RepoOwner, + RepoName: signal.RepoName, + EpicNumber: signal.EpicNumber, + ChildNumber: signal.ChildNumber, + Success: false, + Error: fmt.Sprintf("scp ticket: %v", err), + Timestamp: time.Now(), + Duration: time.Since(start), + }, nil + } + + // Comment on issue. + comment := fmt.Sprintf("Dispatched to **%s** agent queue.", signal.Assignee) + _ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), comment) + + return &jobrunner.ActionResult{ + Action: "dispatch", + RepoOwner: signal.RepoOwner, + RepoName: signal.RepoName, + EpicNumber: signal.EpicNumber, + ChildNumber: signal.ChildNumber, + Success: true, + Timestamp: time.Now(), + Duration: time.Since(start), + }, nil +} + +// scpTicket writes ticket data to a remote path via SSH. +func (h *DispatchHandler) scpTicket(ctx context.Context, host, remotePath string, data []byte) error { + // Use ssh + cat instead of scp for piping stdin. + cmd := exec.CommandContext(ctx, "ssh", + "-o", "StrictHostKeyChecking=accept-new", + "-o", "ConnectTimeout=10", + host, + fmt.Sprintf("cat > %s", remotePath), + ) + cmd.Stdin = strings.NewReader(string(data)) + + output, err := cmd.CombinedOutput() + if err != nil { + return log.E("dispatch.scp", fmt.Sprintf("ssh to %s failed: %s", host, string(output)), err) + } + return nil +} + +// ticketExists checks if a ticket file already exists in queue, active, or done. +func (h *DispatchHandler) ticketExists(agent AgentTarget, ticketName string) bool { + cmd := exec.Command("ssh", + "-o", "StrictHostKeyChecking=accept-new", + "-o", "ConnectTimeout=10", + agent.Host, + fmt.Sprintf("test -f %s/%s || test -f %s/../active/%s || test -f %s/../done/%s", + agent.QueueDir, ticketName, + agent.QueueDir, ticketName, + agent.QueueDir, ticketName), + ) + return cmd.Run() == nil +} diff --git a/pkg/jobrunner/handlers/dispatch_test.go b/pkg/jobrunner/handlers/dispatch_test.go new file mode 100644 index 00000000..3048a9e2 --- /dev/null +++ b/pkg/jobrunner/handlers/dispatch_test.go @@ -0,0 +1,53 @@ +package handlers + +import ( + "testing" + + "github.com/host-uk/core/pkg/jobrunner" + "github.com/stretchr/testify/assert" +) + +func TestDispatch_Match_Good_NeedsCoding(t *testing.T) { + h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + }) + sig := &jobrunner.PipelineSignal{ + NeedsCoding: true, + Assignee: "darbs-claude", + } + assert.True(t, h.Match(sig)) +} + +func TestDispatch_Match_Bad_HasPR(t *testing.T) { + h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + }) + sig := &jobrunner.PipelineSignal{ + NeedsCoding: false, + PRNumber: 7, + Assignee: "darbs-claude", + } + assert.False(t, h.Match(sig)) +} + +func TestDispatch_Match_Bad_UnknownAgent(t *testing.T) { + h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + }) + sig := &jobrunner.PipelineSignal{ + NeedsCoding: true, + Assignee: "unknown-user", + } + assert.False(t, h.Match(sig)) +} + +func TestDispatch_Match_Bad_NotAssigned(t *testing.T) { + h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{ + "darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"}, + }) + sig := &jobrunner.PipelineSignal{ + NeedsCoding: true, + Assignee: "", + } + assert.False(t, h.Match(sig)) +} diff --git a/pkg/jobrunner/types.go b/pkg/jobrunner/types.go index 3d04da2f..79cf6b5b 100644 --- a/pkg/jobrunner/types.go +++ b/pkg/jobrunner/types.go @@ -22,6 +22,10 @@ type PipelineSignal struct { LastCommitSHA string LastCommitAt time.Time LastReviewAt time.Time + NeedsCoding bool // true if child has no PR (work not started) + Assignee string // issue assignee username (for dispatch) + IssueTitle string // child issue title (for dispatch prompt) + IssueBody string // child issue body (for dispatch prompt) } // RepoFullName returns "owner/repo". diff --git a/scripts/agent-runner.sh b/scripts/agent-runner.sh new file mode 100755 index 00000000..f99009ae --- /dev/null +++ b/scripts/agent-runner.sh @@ -0,0 +1,126 @@ +#!/bin/bash +# agent-runner.sh — One-at-a-time queue runner for Claude Code agents. +# Deployed to agent machines, triggered by cron every 5 minutes. +# +# Usage: */5 * * * * ~/ai-work/agent-runner.sh >> ~/ai-work/logs/runner.log 2>&1 +set -euo pipefail + +WORK_DIR="${HOME}/ai-work" +QUEUE_DIR="${WORK_DIR}/queue" +ACTIVE_DIR="${WORK_DIR}/active" +DONE_DIR="${WORK_DIR}/done" +LOG_DIR="${WORK_DIR}/logs" +LOCK_FILE="${WORK_DIR}/.runner.lock" + +# Ensure directories exist. +mkdir -p "$QUEUE_DIR" "$ACTIVE_DIR" "$DONE_DIR" "$LOG_DIR" + +# --- 1. Check lock (is another run active?) --- +if [ -f "$LOCK_FILE" ]; then + PID=$(cat "$LOCK_FILE" 2>/dev/null || echo "") + if [ -n "$PID" ] && kill -0 "$PID" 2>/dev/null; then + echo "$(date -Iseconds) Runner already active (PID $PID), exiting." + exit 0 + fi + echo "$(date -Iseconds) Removing stale lock (PID $PID)." + rm -f "$LOCK_FILE" +fi + +# --- 2. Check credits --- +# Parse remaining usage from claude. If under 5% remaining, skip. +if command -v claude &>/dev/null; then + USAGE_OUTPUT=$(claude --output-format json -p "Reply with just the word OK" 2>/dev/null | head -1 || echo "") + # Fallback: if we can't check, proceed anyway. +fi + +# --- 3. Pick oldest ticket --- +TICKET=$(find "$QUEUE_DIR" -name 'ticket-*.json' -type f 2>/dev/null | sort | head -1) +if [ -z "$TICKET" ]; then + exit 0 # No work +fi + +TICKET_BASENAME=$(basename "$TICKET") +echo "$(date -Iseconds) Processing ticket: $TICKET_BASENAME" + +# --- 4. Lock --- +echo $$ > "$LOCK_FILE" +cleanup() { + rm -f "$LOCK_FILE" + echo "$(date -Iseconds) Lock released." +} +trap cleanup EXIT + +# --- 5. Move to active --- +mv "$TICKET" "$ACTIVE_DIR/" +TICKET_FILE="$ACTIVE_DIR/$TICKET_BASENAME" + +# --- 6. Extract ticket data --- +REPO_OWNER=$(jq -r .repo_owner "$TICKET_FILE") +REPO_NAME=$(jq -r .repo_name "$TICKET_FILE") +ISSUE_NUM=$(jq -r .issue_number "$TICKET_FILE") +ISSUE_TITLE=$(jq -r .issue_title "$TICKET_FILE") +ISSUE_BODY=$(jq -r .issue_body "$TICKET_FILE") +TARGET_BRANCH=$(jq -r .target_branch "$TICKET_FILE") +FORGE_URL=$(jq -r .forge_url "$TICKET_FILE") +FORGE_TOKEN=$(jq -r .forge_token "$TICKET_FILE") + +echo "$(date -Iseconds) Issue: ${REPO_OWNER}/${REPO_NAME}#${ISSUE_NUM} - ${ISSUE_TITLE}" + +# --- 7. Clone or update repo --- +JOB_DIR="$WORK_DIR/jobs/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}" +REPO_DIR="$JOB_DIR/$REPO_NAME" +mkdir -p "$JOB_DIR" + +CLONE_URL="https://darbs-claude:${FORGE_TOKEN}@${FORGE_URL#https://}/${REPO_OWNER}/${REPO_NAME}.git" + +if [ -d "$REPO_DIR/.git" ]; then + echo "$(date -Iseconds) Updating existing clone..." + cd "$REPO_DIR" + git fetch origin + git checkout "$TARGET_BRANCH" 2>/dev/null || git checkout -b "$TARGET_BRANCH" "origin/$TARGET_BRANCH" + git pull origin "$TARGET_BRANCH" +else + echo "$(date -Iseconds) Cloning repo..." + git clone -b "$TARGET_BRANCH" "$CLONE_URL" "$REPO_DIR" + cd "$REPO_DIR" +fi + +# --- 8. Build prompt --- +PROMPT="You are working on issue #${ISSUE_NUM} in ${REPO_OWNER}/${REPO_NAME}. + +Title: ${ISSUE_TITLE} + +Description: +${ISSUE_BODY} + +The repo is cloned at the current directory on branch '${TARGET_BRANCH}'. +Create a feature branch from '${TARGET_BRANCH}', make minimal targeted changes, commit referencing #${ISSUE_NUM}, and push. +Then create a PR targeting '${TARGET_BRANCH}' using the forgejo MCP tools or git push." + +# --- 9. Run Claude --- +LOG_FILE="$LOG_DIR/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}.log" +echo "$(date -Iseconds) Running claude..." +echo "$PROMPT" | claude -p \ + --dangerously-skip-permissions \ + --output-format text \ + > "$LOG_FILE" 2>&1 +EXIT_CODE=$? +echo "$(date -Iseconds) Claude exited with code: $EXIT_CODE" + +# --- 10. Move to done --- +mv "$TICKET_FILE" "$DONE_DIR/" + +# --- 11. Report result back to Forgejo --- +if [ $EXIT_CODE -eq 0 ]; then + COMMENT="Agent completed work on #${ISSUE_NUM}. Exit code: 0." +else + COMMENT="Agent failed on #${ISSUE_NUM} (exit code: ${EXIT_CODE}). Check logs on agent machine." +fi + +curl -s -X POST "${FORGE_URL}/api/v1/repos/${REPO_OWNER}/${REPO_NAME}/issues/${ISSUE_NUM}/comments" \ + -H "Authorization: token $FORGE_TOKEN" \ + -H "Content-Type: application/json" \ + -d "$(jq -n --arg body "$COMMENT" '{body: $body}')" \ + > /dev/null 2>&1 || true + +echo "$(date -Iseconds) Done: $TICKET_BASENAME (exit: $EXIT_CODE)"