agent/docs/flow/RFC.flow-gather-training-data.md
Snider be78c27561 docs: add full RFC specs for agent dispatch
AX principles + go/agent + core/agent + php/agent specs.
Temporary — needed in-repo until core-agent mount bug is fixed.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-30 19:51:55 +01:00

9.3 KiB

name description
flow-gather-training-data Use when capturing training data from completed flows. Records structural signals (IDs, timestamps, SHAs) to JSONL journals for model training.

Flow: Gather Training Data

Continuously capture PR/issue state observations for training the agentic orchestrator model.


Purpose

Build a time-series dataset of:

  1. Input signals - PR state, CI status, review counts, timing
  2. Actions taken - what the orchestrator decided
  3. Outcomes - did it work? how long to resolution?

This enables training a model to predict correct actions from signals alone.


Infrastructure

InfluxDB Setup

# Install (Ubuntu 24.04)
curl -sL https://repos.influxdata.com/influxdata-archive.key | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/influxdata-archive.gpg
echo "deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive.gpg] https://repos.influxdata.com/ubuntu noble stable" | sudo tee /etc/apt/sources.list.d/influxdata.list
sudo apt-get update && sudo apt-get install -y influxdb2 influxdb2-cli

# Start service
sudo systemctl enable influxdb --now

# Initial setup (interactive)
influx setup \
  --org agentic \
  --bucket training \
  --username claude \
  --password <password> \
  --force

# Create API token for writes
influx auth create --org agentic --write-bucket training --description "training-data-capture"

Store the token in ~/.influx_token (chmod 600).

Schema (InfluxDB Line Protocol)

# Measurement: pr_observation
pr_observation,repo=dappcore/core,pr=315,author=jules[bot] \
  merge_state="CLEAN",mergeable=true,is_draft=false,\
  checks_total=8i,checks_passing=8i,checks_failing=0i,\
  reviews_approved=1i,reviews_changes_requested=0i,\
  threads_total=5i,threads_unresolved=0i,\
  pr_age_hours=48i,last_push_hours=2i,\
  conflict_attempts=0i,review_fix_attempts=0i \
  1707123600000000000

# Measurement: action_taken
action_taken,repo=dappcore/core,pr=315 \
  action="wait",reason="auto-merge enabled, checks passing" \
  1707123600000000000

# Measurement: outcome
outcome,repo=dappcore/core,pr=315 \
  result="success",detail="merged via auto-merge",resolution_hours=0.5 \
  1707125400000000000

Capture Script

Location: ~/infra/tasks-agentic/training-data/capture-to-influx.sh

#!/bin/bash
# capture-to-influx.sh - Capture PR states to InfluxDB
set -euo pipefail

INFLUX_HOST="${INFLUX_HOST:-http://localhost:8086}"
INFLUX_ORG="${INFLUX_ORG:-agentic}"
INFLUX_BUCKET="${INFLUX_BUCKET:-training}"
INFLUX_TOKEN="${INFLUX_TOKEN:-$(cat ~/.influx_token 2>/dev/null)}"
REPO="${1:-dappcore/core}"

capture_pr_to_influx() {
    local repo=$1
    local pr=$2
    local timestamp
    timestamp=$(date +%s%N)

    # Get PR data
    local data
    data=$(gh pr view "$pr" --repo "$repo" --json \
        number,mergeable,mergeStateStatus,statusCheckRollup,\
latestReviews,reviewDecision,labels,author,createdAt,updatedAt,\
commits,autoMergeRequest,isDraft 2>/dev/null)

    # Extract fields
    local merge_state=$(echo "$data" | jq -r '.mergeStateStatus // "UNKNOWN"')
    local mergeable=$(echo "$data" | jq -r 'if .mergeable == "MERGEABLE" then "true" else "false" end')
    local is_draft=$(echo "$data" | jq -r '.isDraft // false')
    local checks_total=$(echo "$data" | jq '[.statusCheckRollup[]? | select(.name != null)] | length')
    local checks_passing=$(echo "$data" | jq '[.statusCheckRollup[]? | select(.conclusion == "SUCCESS")] | length')
    local checks_failing=$(echo "$data" | jq '[.statusCheckRollup[]? | select(.conclusion == "FAILURE")] | length')
    local reviews_approved=$(echo "$data" | jq '[.latestReviews[]? | select(.state == "APPROVED")] | length')
    local reviews_changes=$(echo "$data" | jq '[.latestReviews[]? | select(.state == "CHANGES_REQUESTED")] | length')
    local author=$(echo "$data" | jq -r '.author.login // "unknown"')
    local auto_merge=$(echo "$data" | jq -r 'if .autoMergeRequest != null then "true" else "false" end')

    # Calculate ages
    local created=$(echo "$data" | jq -r '.createdAt')
    local updated=$(echo "$data" | jq -r '.updatedAt')
    # NOTE: date -d is GNU (Linux). On macOS use: date -j -f "%Y-%m-%dT%H:%M:%SZ" "$created" +%s
    local pr_age_hours=$(( ($(date +%s) - $(date -d "$created" +%s)) / 3600 ))
    local last_activity_hours=$(( ($(date +%s) - $(date -d "$updated" +%s)) / 3600 ))

    # Build line protocol
    local line="pr_observation,repo=${repo//\//_},pr=${pr},author=${author} "
    line+="merge_state=\"${merge_state}\","
    line+="mergeable=${mergeable},"
    line+="is_draft=${is_draft},"
    line+="checks_total=${checks_total}i,"
    line+="checks_passing=${checks_passing}i,"
    line+="checks_failing=${checks_failing}i,"
    line+="reviews_approved=${reviews_approved}i,"
    line+="reviews_changes_requested=${reviews_changes}i,"
    line+="auto_merge_enabled=${auto_merge},"
    line+="pr_age_hours=${pr_age_hours}i,"
    line+="last_activity_hours=${last_activity_hours}i "
    line+="${timestamp}"

    # Write to InfluxDB
    curl -s -XPOST "${INFLUX_HOST}/api/v2/write?org=${INFLUX_ORG}&bucket=${INFLUX_BUCKET}&precision=ns" \
        -H "Authorization: Token ${INFLUX_TOKEN}" \
        -H "Content-Type: text/plain" \
        --data-raw "$line"

    echo "Captured PR #${pr}"
}

# Capture all open PRs
for pr in $(gh pr list --repo "$REPO" --state open --json number --jq '.[].number'); do
    capture_pr_to_influx "$REPO" "$pr"
done

Cron Schedule

# Add to crontab -e
# Capture every 15 minutes
*/15 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core >> /home/claude/logs/training-capture.log 2>&1

# Also capture PHP repos hourly (lower priority)
0 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core-php >> /home/claude/logs/training-capture.log 2>&1
0 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core-mcp >> /home/claude/logs/training-capture.log 2>&1
0 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core-api >> /home/claude/logs/training-capture.log 2>&1

Recording Actions & Outcomes

When Orchestrator Takes Action

After any orchestration action, record it:

record_action() {
    local repo=$1 pr=$2 action=$3 reason=$4
    local timestamp=$(date +%s%N)
    local line="action_taken,repo=${repo//\//_},pr=${pr} action=\"${action}\",reason=\"${reason}\" ${timestamp}"

    curl -s -XPOST "${INFLUX_HOST}/api/v2/write?org=${INFLUX_ORG}&bucket=${INFLUX_BUCKET}&precision=ns" \
        -H "Authorization: Token ${INFLUX_TOKEN}" \
        --data-raw "$line"
}

# Examples:
record_action "dappcore/core" 315 "wait" "auto-merge enabled, all checks passing"
record_action "dappcore/core" 307 "request_review_fix" "unresolved threads, attempt 1"
record_action "dappcore/core" 319 "resolve_conflict" "conflict_attempts >= 2, manual resolution"

When PR Resolves

When a PR merges, closes, or is escalated:

record_outcome() {
    local repo=$1 pr=$2 result=$3 detail=$4 resolution_hours=$5
    local timestamp=$(date +%s%N)
    local line="outcome,repo=${repo//\//_},pr=${pr} result=\"${result}\",detail=\"${detail}\",resolution_hours=${resolution_hours} ${timestamp}"

    curl -s -XPOST "${INFLUX_HOST}/api/v2/write?org=${INFLUX_ORG}&bucket=${INFLUX_BUCKET}&precision=ns" \
        -H "Authorization: Token ${INFLUX_TOKEN}" \
        --data-raw "$line"
}

# Examples:
record_outcome "dappcore/core" 315 "success" "merged via auto-merge" 0.5
record_outcome "dappcore/core" 307 "success" "merged after 2 review fix requests" 4.2
record_outcome "dappcore/core" 291 "escalated" "conflict unresolvable after manual attempt" 72.0

Query Examples

Flux queries for analysis

// All observations for a PR over time
from(bucket: "training")
  |> range(start: -7d)
  |> filter(fn: (r) => r._measurement == "pr_observation")
  |> filter(fn: (r) => r.pr == "315")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

// Action success rate by type
from(bucket: "training")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "outcome")
  |> filter(fn: (r) => r._field == "result")
  |> group(columns: ["action"])
  |> count()

// Average resolution time by action type
from(bucket: "training")
  |> range(start: -30d)
  |> filter(fn: (r) => r._measurement == "outcome")
  |> filter(fn: (r) => r._field == "resolution_hours")
  |> group(columns: ["action"])
  |> mean()

Export for Training

# Export to JSONL for model training
influx query '
from(bucket: "training")
  |> range(start: -90d)
  |> filter(fn: (r) => r._measurement == "pr_observation")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
' --raw | jq -c '.' > training-export.jsonl

Integration with issue-epic.md

The issue-epic flow should call record_action at each decision point:

  1. Step 3 (CI Gate) - After checking checks: record_action $REPO $PR "wait" "CI running"
  2. Step 5 (Fix Review) - After sending fix request: record_action $REPO $PR "request_review_fix" "unresolved threads"
  3. Step 7 (Update Branch) - After conflict request: record_action $REPO $PR "request_conflict_fix" "merge conflict detected"
  4. Step 8 (Merge) - When PR merges: record_outcome $REPO $PR "success" "merged" $hours

Created: 2026-02-05 Part of: agentic pipeline training infrastructure