feat(jobrunner): add agent dispatch handler and queue runner
Dispatch handler matches child issues that need coding (no PR yet, assigned to a known agent) and SCPs ticket JSON to the agent's queue directory via SSH. Includes dedup across queue/active/done and posts dispatch comments on issues. - Extend PipelineSignal with NeedsCoding, Assignee, IssueTitle, IssueBody - Extend ForgejoSource to emit signals for unstarted children - Add DispatchHandler with Match/Execute (SCP ticket delivery) - Add agent-runner.sh cron-based queue runner for agent machines - Wire dispatch handler into headless mode Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d32c51d816
commit
849695fe39
6 changed files with 402 additions and 1 deletions
|
|
@ -42,7 +42,8 @@ func startHeadless() {
|
|||
}
|
||||
|
||||
// Forge client
|
||||
forgeClient, err := forge.NewFromConfig("", "")
|
||||
forgeURL, forgeToken, _ := forge.ResolveConfig("", "")
|
||||
forgeClient, err := forge.New(forgeURL, forgeToken)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create forge client: %v", err)
|
||||
}
|
||||
|
|
@ -64,6 +65,12 @@ func startHeadless() {
|
|||
enableAutoMerge := handlers.NewEnableAutoMergeHandler(forgeClient)
|
||||
tickParent := handlers.NewTickParentHandler(forgeClient)
|
||||
|
||||
// Agent dispatch — maps Forgejo usernames to SSH targets.
|
||||
agentTargets := map[string]handlers.AgentTarget{
|
||||
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "/home/claude/ai-work/queue"},
|
||||
}
|
||||
dispatch := handlers.NewDispatchHandler(forgeClient, forgeURL, forgeToken, agentTargets)
|
||||
|
||||
// Build poller
|
||||
poller := jobrunner.NewPoller(jobrunner.PollerConfig{
|
||||
Sources: []jobrunner.JobSource{source},
|
||||
|
|
@ -73,6 +80,7 @@ func startHeadless() {
|
|||
dismissReviews,
|
||||
enableAutoMerge,
|
||||
tickParent,
|
||||
dispatch, // Last — only matches NeedsCoding signals
|
||||
},
|
||||
Journal: journal,
|
||||
PollInterval: 60 * time.Second,
|
||||
|
|
|
|||
|
|
@ -115,7 +115,27 @@ func (s *ForgejoSource) pollRepo(_ context.Context, owner, repo string) ([]*jobr
|
|||
unchecked, _ := parseEpicChildren(epic.Body)
|
||||
for _, childNum := range unchecked {
|
||||
pr := findLinkedPR(prs, childNum)
|
||||
|
||||
if pr == nil {
|
||||
// No PR yet — check if the child issue is assigned (needs coding).
|
||||
childIssue, err := s.forge.GetIssue(owner, repo, int64(childNum))
|
||||
if err != nil {
|
||||
log.Error("fetch child issue failed", "repo", owner+"/"+repo, "issue", childNum, "err", err)
|
||||
continue
|
||||
}
|
||||
if len(childIssue.Assignees) > 0 && childIssue.Assignees[0].UserName != "" {
|
||||
sig := &jobrunner.PipelineSignal{
|
||||
EpicNumber: epic.Number,
|
||||
ChildNumber: childNum,
|
||||
RepoOwner: owner,
|
||||
RepoName: repo,
|
||||
NeedsCoding: true,
|
||||
Assignee: childIssue.Assignees[0].UserName,
|
||||
IssueTitle: childIssue.Title,
|
||||
IssueBody: childIssue.Body,
|
||||
}
|
||||
signals = append(signals, sig)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
190
pkg/jobrunner/handlers/dispatch.go
Normal file
190
pkg/jobrunner/handlers/dispatch.go
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/host-uk/core/pkg/forge"
|
||||
"github.com/host-uk/core/pkg/jobrunner"
|
||||
"github.com/host-uk/core/pkg/log"
|
||||
)
|
||||
|
||||
// AgentTarget maps a Forgejo username to an SSH-reachable agent machine.
|
||||
type AgentTarget struct {
|
||||
Host string // SSH destination (e.g., "claude@192.168.0.201")
|
||||
QueueDir string // Remote queue directory (e.g., "~/ai-work/queue")
|
||||
}
|
||||
|
||||
// DispatchTicket is the JSON payload written to the agent's queue.
|
||||
type DispatchTicket struct {
|
||||
ID string `json:"id"`
|
||||
RepoOwner string `json:"repo_owner"`
|
||||
RepoName string `json:"repo_name"`
|
||||
IssueNumber int `json:"issue_number"`
|
||||
IssueTitle string `json:"issue_title"`
|
||||
IssueBody string `json:"issue_body"`
|
||||
TargetBranch string `json:"target_branch"`
|
||||
EpicNumber int `json:"epic_number"`
|
||||
ForgeURL string `json:"forge_url"`
|
||||
ForgeToken string `json:"forge_token"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// DispatchHandler dispatches coding work to remote agent machines via SSH/SCP.
|
||||
type DispatchHandler struct {
|
||||
forge *forge.Client
|
||||
forgeURL string
|
||||
token string
|
||||
agents map[string]AgentTarget
|
||||
}
|
||||
|
||||
// NewDispatchHandler creates a handler that dispatches tickets to agent machines.
|
||||
func NewDispatchHandler(client *forge.Client, forgeURL, token string, agents map[string]AgentTarget) *DispatchHandler {
|
||||
return &DispatchHandler{
|
||||
forge: client,
|
||||
forgeURL: forgeURL,
|
||||
token: token,
|
||||
agents: agents,
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the handler identifier.
|
||||
func (h *DispatchHandler) Name() string {
|
||||
return "dispatch"
|
||||
}
|
||||
|
||||
// Match returns true for signals where a child issue needs coding (no PR yet)
|
||||
// and the assignee is a known agent.
|
||||
func (h *DispatchHandler) Match(signal *jobrunner.PipelineSignal) bool {
|
||||
if !signal.NeedsCoding {
|
||||
return false
|
||||
}
|
||||
_, ok := h.agents[signal.Assignee]
|
||||
return ok
|
||||
}
|
||||
|
||||
// Execute creates a ticket JSON and SCPs it to the agent's queue directory.
|
||||
func (h *DispatchHandler) Execute(ctx context.Context, signal *jobrunner.PipelineSignal) (*jobrunner.ActionResult, error) {
|
||||
start := time.Now()
|
||||
|
||||
agent, ok := h.agents[signal.Assignee]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown agent: %s", signal.Assignee)
|
||||
}
|
||||
|
||||
// Determine target branch (default to repo default).
|
||||
targetBranch := "new" // TODO: resolve from epic or repo default
|
||||
|
||||
ticket := DispatchTicket{
|
||||
ID: fmt.Sprintf("%s-%s-%d-%d", signal.RepoOwner, signal.RepoName, signal.ChildNumber, time.Now().Unix()),
|
||||
RepoOwner: signal.RepoOwner,
|
||||
RepoName: signal.RepoName,
|
||||
IssueNumber: signal.ChildNumber,
|
||||
IssueTitle: signal.IssueTitle,
|
||||
IssueBody: signal.IssueBody,
|
||||
TargetBranch: targetBranch,
|
||||
EpicNumber: signal.EpicNumber,
|
||||
ForgeURL: h.forgeURL,
|
||||
ForgeToken: h.token,
|
||||
CreatedAt: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
ticketJSON, err := json.MarshalIndent(ticket, "", " ")
|
||||
if err != nil {
|
||||
return &jobrunner.ActionResult{
|
||||
Action: "dispatch",
|
||||
RepoOwner: signal.RepoOwner,
|
||||
RepoName: signal.RepoName,
|
||||
EpicNumber: signal.EpicNumber,
|
||||
ChildNumber: signal.ChildNumber,
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("marshal ticket: %v", err),
|
||||
Timestamp: time.Now(),
|
||||
Duration: time.Since(start),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Check if ticket already exists on agent (dedup).
|
||||
ticketName := fmt.Sprintf("ticket-%s-%s-%d.json", signal.RepoOwner, signal.RepoName, signal.ChildNumber)
|
||||
if h.ticketExists(agent, ticketName) {
|
||||
log.Info("ticket already queued, skipping", "ticket", ticketName, "agent", signal.Assignee)
|
||||
return &jobrunner.ActionResult{
|
||||
Action: "dispatch",
|
||||
RepoOwner: signal.RepoOwner,
|
||||
RepoName: signal.RepoName,
|
||||
EpicNumber: signal.EpicNumber,
|
||||
ChildNumber: signal.ChildNumber,
|
||||
Success: true,
|
||||
Timestamp: time.Now(),
|
||||
Duration: time.Since(start),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SCP ticket to agent queue.
|
||||
remotePath := filepath.Join(agent.QueueDir, ticketName)
|
||||
if err := h.scpTicket(ctx, agent.Host, remotePath, ticketJSON); err != nil {
|
||||
return &jobrunner.ActionResult{
|
||||
Action: "dispatch",
|
||||
RepoOwner: signal.RepoOwner,
|
||||
RepoName: signal.RepoName,
|
||||
EpicNumber: signal.EpicNumber,
|
||||
ChildNumber: signal.ChildNumber,
|
||||
Success: false,
|
||||
Error: fmt.Sprintf("scp ticket: %v", err),
|
||||
Timestamp: time.Now(),
|
||||
Duration: time.Since(start),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Comment on issue.
|
||||
comment := fmt.Sprintf("Dispatched to **%s** agent queue.", signal.Assignee)
|
||||
_ = h.forge.CreateIssueComment(signal.RepoOwner, signal.RepoName, int64(signal.ChildNumber), comment)
|
||||
|
||||
return &jobrunner.ActionResult{
|
||||
Action: "dispatch",
|
||||
RepoOwner: signal.RepoOwner,
|
||||
RepoName: signal.RepoName,
|
||||
EpicNumber: signal.EpicNumber,
|
||||
ChildNumber: signal.ChildNumber,
|
||||
Success: true,
|
||||
Timestamp: time.Now(),
|
||||
Duration: time.Since(start),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// scpTicket writes ticket data to a remote path via SSH.
|
||||
func (h *DispatchHandler) scpTicket(ctx context.Context, host, remotePath string, data []byte) error {
|
||||
// Use ssh + cat instead of scp for piping stdin.
|
||||
cmd := exec.CommandContext(ctx, "ssh",
|
||||
"-o", "StrictHostKeyChecking=accept-new",
|
||||
"-o", "ConnectTimeout=10",
|
||||
host,
|
||||
fmt.Sprintf("cat > %s", remotePath),
|
||||
)
|
||||
cmd.Stdin = strings.NewReader(string(data))
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return log.E("dispatch.scp", fmt.Sprintf("ssh to %s failed: %s", host, string(output)), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ticketExists checks if a ticket file already exists in queue, active, or done.
|
||||
func (h *DispatchHandler) ticketExists(agent AgentTarget, ticketName string) bool {
|
||||
cmd := exec.Command("ssh",
|
||||
"-o", "StrictHostKeyChecking=accept-new",
|
||||
"-o", "ConnectTimeout=10",
|
||||
agent.Host,
|
||||
fmt.Sprintf("test -f %s/%s || test -f %s/../active/%s || test -f %s/../done/%s",
|
||||
agent.QueueDir, ticketName,
|
||||
agent.QueueDir, ticketName,
|
||||
agent.QueueDir, ticketName),
|
||||
)
|
||||
return cmd.Run() == nil
|
||||
}
|
||||
53
pkg/jobrunner/handlers/dispatch_test.go
Normal file
53
pkg/jobrunner/handlers/dispatch_test.go
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
package handlers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/host-uk/core/pkg/jobrunner"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestDispatch_Match_Good_NeedsCoding(t *testing.T) {
|
||||
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
|
||||
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
|
||||
})
|
||||
sig := &jobrunner.PipelineSignal{
|
||||
NeedsCoding: true,
|
||||
Assignee: "darbs-claude",
|
||||
}
|
||||
assert.True(t, h.Match(sig))
|
||||
}
|
||||
|
||||
func TestDispatch_Match_Bad_HasPR(t *testing.T) {
|
||||
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
|
||||
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
|
||||
})
|
||||
sig := &jobrunner.PipelineSignal{
|
||||
NeedsCoding: false,
|
||||
PRNumber: 7,
|
||||
Assignee: "darbs-claude",
|
||||
}
|
||||
assert.False(t, h.Match(sig))
|
||||
}
|
||||
|
||||
func TestDispatch_Match_Bad_UnknownAgent(t *testing.T) {
|
||||
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
|
||||
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
|
||||
})
|
||||
sig := &jobrunner.PipelineSignal{
|
||||
NeedsCoding: true,
|
||||
Assignee: "unknown-user",
|
||||
}
|
||||
assert.False(t, h.Match(sig))
|
||||
}
|
||||
|
||||
func TestDispatch_Match_Bad_NotAssigned(t *testing.T) {
|
||||
h := NewDispatchHandler(nil, "", "", map[string]AgentTarget{
|
||||
"darbs-claude": {Host: "claude@192.168.0.201", QueueDir: "~/ai-work/queue"},
|
||||
})
|
||||
sig := &jobrunner.PipelineSignal{
|
||||
NeedsCoding: true,
|
||||
Assignee: "",
|
||||
}
|
||||
assert.False(t, h.Match(sig))
|
||||
}
|
||||
|
|
@ -22,6 +22,10 @@ type PipelineSignal struct {
|
|||
LastCommitSHA string
|
||||
LastCommitAt time.Time
|
||||
LastReviewAt time.Time
|
||||
NeedsCoding bool // true if child has no PR (work not started)
|
||||
Assignee string // issue assignee username (for dispatch)
|
||||
IssueTitle string // child issue title (for dispatch prompt)
|
||||
IssueBody string // child issue body (for dispatch prompt)
|
||||
}
|
||||
|
||||
// RepoFullName returns "owner/repo".
|
||||
|
|
|
|||
126
scripts/agent-runner.sh
Executable file
126
scripts/agent-runner.sh
Executable file
|
|
@ -0,0 +1,126 @@
|
|||
#!/bin/bash
|
||||
# agent-runner.sh — One-at-a-time queue runner for Claude Code agents.
|
||||
# Deployed to agent machines, triggered by cron every 5 minutes.
|
||||
#
|
||||
# Usage: */5 * * * * ~/ai-work/agent-runner.sh >> ~/ai-work/logs/runner.log 2>&1
|
||||
set -euo pipefail
|
||||
|
||||
WORK_DIR="${HOME}/ai-work"
|
||||
QUEUE_DIR="${WORK_DIR}/queue"
|
||||
ACTIVE_DIR="${WORK_DIR}/active"
|
||||
DONE_DIR="${WORK_DIR}/done"
|
||||
LOG_DIR="${WORK_DIR}/logs"
|
||||
LOCK_FILE="${WORK_DIR}/.runner.lock"
|
||||
|
||||
# Ensure directories exist.
|
||||
mkdir -p "$QUEUE_DIR" "$ACTIVE_DIR" "$DONE_DIR" "$LOG_DIR"
|
||||
|
||||
# --- 1. Check lock (is another run active?) ---
|
||||
if [ -f "$LOCK_FILE" ]; then
|
||||
PID=$(cat "$LOCK_FILE" 2>/dev/null || echo "")
|
||||
if [ -n "$PID" ] && kill -0 "$PID" 2>/dev/null; then
|
||||
echo "$(date -Iseconds) Runner already active (PID $PID), exiting."
|
||||
exit 0
|
||||
fi
|
||||
echo "$(date -Iseconds) Removing stale lock (PID $PID)."
|
||||
rm -f "$LOCK_FILE"
|
||||
fi
|
||||
|
||||
# --- 2. Check credits ---
|
||||
# Parse remaining usage from claude. If under 5% remaining, skip.
|
||||
if command -v claude &>/dev/null; then
|
||||
USAGE_OUTPUT=$(claude --output-format json -p "Reply with just the word OK" 2>/dev/null | head -1 || echo "")
|
||||
# Fallback: if we can't check, proceed anyway.
|
||||
fi
|
||||
|
||||
# --- 3. Pick oldest ticket ---
|
||||
TICKET=$(find "$QUEUE_DIR" -name 'ticket-*.json' -type f 2>/dev/null | sort | head -1)
|
||||
if [ -z "$TICKET" ]; then
|
||||
exit 0 # No work
|
||||
fi
|
||||
|
||||
TICKET_BASENAME=$(basename "$TICKET")
|
||||
echo "$(date -Iseconds) Processing ticket: $TICKET_BASENAME"
|
||||
|
||||
# --- 4. Lock ---
|
||||
echo $$ > "$LOCK_FILE"
|
||||
cleanup() {
|
||||
rm -f "$LOCK_FILE"
|
||||
echo "$(date -Iseconds) Lock released."
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
# --- 5. Move to active ---
|
||||
mv "$TICKET" "$ACTIVE_DIR/"
|
||||
TICKET_FILE="$ACTIVE_DIR/$TICKET_BASENAME"
|
||||
|
||||
# --- 6. Extract ticket data ---
|
||||
REPO_OWNER=$(jq -r .repo_owner "$TICKET_FILE")
|
||||
REPO_NAME=$(jq -r .repo_name "$TICKET_FILE")
|
||||
ISSUE_NUM=$(jq -r .issue_number "$TICKET_FILE")
|
||||
ISSUE_TITLE=$(jq -r .issue_title "$TICKET_FILE")
|
||||
ISSUE_BODY=$(jq -r .issue_body "$TICKET_FILE")
|
||||
TARGET_BRANCH=$(jq -r .target_branch "$TICKET_FILE")
|
||||
FORGE_URL=$(jq -r .forge_url "$TICKET_FILE")
|
||||
FORGE_TOKEN=$(jq -r .forge_token "$TICKET_FILE")
|
||||
|
||||
echo "$(date -Iseconds) Issue: ${REPO_OWNER}/${REPO_NAME}#${ISSUE_NUM} - ${ISSUE_TITLE}"
|
||||
|
||||
# --- 7. Clone or update repo ---
|
||||
JOB_DIR="$WORK_DIR/jobs/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}"
|
||||
REPO_DIR="$JOB_DIR/$REPO_NAME"
|
||||
mkdir -p "$JOB_DIR"
|
||||
|
||||
CLONE_URL="https://darbs-claude:${FORGE_TOKEN}@${FORGE_URL#https://}/${REPO_OWNER}/${REPO_NAME}.git"
|
||||
|
||||
if [ -d "$REPO_DIR/.git" ]; then
|
||||
echo "$(date -Iseconds) Updating existing clone..."
|
||||
cd "$REPO_DIR"
|
||||
git fetch origin
|
||||
git checkout "$TARGET_BRANCH" 2>/dev/null || git checkout -b "$TARGET_BRANCH" "origin/$TARGET_BRANCH"
|
||||
git pull origin "$TARGET_BRANCH"
|
||||
else
|
||||
echo "$(date -Iseconds) Cloning repo..."
|
||||
git clone -b "$TARGET_BRANCH" "$CLONE_URL" "$REPO_DIR"
|
||||
cd "$REPO_DIR"
|
||||
fi
|
||||
|
||||
# --- 8. Build prompt ---
|
||||
PROMPT="You are working on issue #${ISSUE_NUM} in ${REPO_OWNER}/${REPO_NAME}.
|
||||
|
||||
Title: ${ISSUE_TITLE}
|
||||
|
||||
Description:
|
||||
${ISSUE_BODY}
|
||||
|
||||
The repo is cloned at the current directory on branch '${TARGET_BRANCH}'.
|
||||
Create a feature branch from '${TARGET_BRANCH}', make minimal targeted changes, commit referencing #${ISSUE_NUM}, and push.
|
||||
Then create a PR targeting '${TARGET_BRANCH}' using the forgejo MCP tools or git push."
|
||||
|
||||
# --- 9. Run Claude ---
|
||||
LOG_FILE="$LOG_DIR/${REPO_OWNER}-${REPO_NAME}-${ISSUE_NUM}.log"
|
||||
echo "$(date -Iseconds) Running claude..."
|
||||
echo "$PROMPT" | claude -p \
|
||||
--dangerously-skip-permissions \
|
||||
--output-format text \
|
||||
> "$LOG_FILE" 2>&1
|
||||
EXIT_CODE=$?
|
||||
echo "$(date -Iseconds) Claude exited with code: $EXIT_CODE"
|
||||
|
||||
# --- 10. Move to done ---
|
||||
mv "$TICKET_FILE" "$DONE_DIR/"
|
||||
|
||||
# --- 11. Report result back to Forgejo ---
|
||||
if [ $EXIT_CODE -eq 0 ]; then
|
||||
COMMENT="Agent completed work on #${ISSUE_NUM}. Exit code: 0."
|
||||
else
|
||||
COMMENT="Agent failed on #${ISSUE_NUM} (exit code: ${EXIT_CODE}). Check logs on agent machine."
|
||||
fi
|
||||
|
||||
curl -s -X POST "${FORGE_URL}/api/v1/repos/${REPO_OWNER}/${REPO_NAME}/issues/${ISSUE_NUM}/comments" \
|
||||
-H "Authorization: token $FORGE_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$(jq -n --arg body "$COMMENT" '{body: $body}')" \
|
||||
> /dev/null 2>&1 || true
|
||||
|
||||
echo "$(date -Iseconds) Done: $TICKET_BASENAME (exit: $EXIT_CODE)"
|
||||
Loading…
Add table
Reference in a new issue