AX principles + go/agent + core/agent + php/agent specs. Temporary — needed in-repo until core-agent mount bug is fixed. Co-Authored-By: Virgil <virgil@lethean.io>
9.3 KiB
9.3 KiB
| name | description |
|---|---|
| flow-gather-training-data | Use when capturing training data from completed flows. Records structural signals (IDs, timestamps, SHAs) to JSONL journals for model training. |
Flow: Gather Training Data
Continuously capture PR/issue state observations for training the agentic orchestrator model.
Purpose
Build a time-series dataset of:
- Input signals - PR state, CI status, review counts, timing
- Actions taken - what the orchestrator decided
- Outcomes - did it work? how long to resolution?
This enables training a model to predict correct actions from signals alone.
Infrastructure
InfluxDB Setup
# Install (Ubuntu 24.04)
curl -sL https://repos.influxdata.com/influxdata-archive.key | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/influxdata-archive.gpg
echo "deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive.gpg] https://repos.influxdata.com/ubuntu noble stable" | sudo tee /etc/apt/sources.list.d/influxdata.list
sudo apt-get update && sudo apt-get install -y influxdb2 influxdb2-cli
# Start service
sudo systemctl enable influxdb --now
# Initial setup (interactive)
influx setup \
--org agentic \
--bucket training \
--username claude \
--password <password> \
--force
# Create API token for writes
influx auth create --org agentic --write-bucket training --description "training-data-capture"
Store the token in ~/.influx_token (chmod 600).
Schema (InfluxDB Line Protocol)
# Measurement: pr_observation
pr_observation,repo=dappcore/core,pr=315,author=jules[bot] \
merge_state="CLEAN",mergeable=true,is_draft=false,\
checks_total=8i,checks_passing=8i,checks_failing=0i,\
reviews_approved=1i,reviews_changes_requested=0i,\
threads_total=5i,threads_unresolved=0i,\
pr_age_hours=48i,last_push_hours=2i,\
conflict_attempts=0i,review_fix_attempts=0i \
1707123600000000000
# Measurement: action_taken
action_taken,repo=dappcore/core,pr=315 \
action="wait",reason="auto-merge enabled, checks passing" \
1707123600000000000
# Measurement: outcome
outcome,repo=dappcore/core,pr=315 \
result="success",detail="merged via auto-merge",resolution_hours=0.5 \
1707125400000000000
Capture Script
Location: ~/infra/tasks-agentic/training-data/capture-to-influx.sh
#!/bin/bash
# capture-to-influx.sh - Capture PR states to InfluxDB
set -euo pipefail
INFLUX_HOST="${INFLUX_HOST:-http://localhost:8086}"
INFLUX_ORG="${INFLUX_ORG:-agentic}"
INFLUX_BUCKET="${INFLUX_BUCKET:-training}"
INFLUX_TOKEN="${INFLUX_TOKEN:-$(cat ~/.influx_token 2>/dev/null)}"
REPO="${1:-dappcore/core}"
capture_pr_to_influx() {
local repo=$1
local pr=$2
local timestamp
timestamp=$(date +%s%N)
# Get PR data
local data
data=$(gh pr view "$pr" --repo "$repo" --json \
number,mergeable,mergeStateStatus,statusCheckRollup,\
latestReviews,reviewDecision,labels,author,createdAt,updatedAt,\
commits,autoMergeRequest,isDraft 2>/dev/null)
# Extract fields
local merge_state=$(echo "$data" | jq -r '.mergeStateStatus // "UNKNOWN"')
local mergeable=$(echo "$data" | jq -r 'if .mergeable == "MERGEABLE" then "true" else "false" end')
local is_draft=$(echo "$data" | jq -r '.isDraft // false')
local checks_total=$(echo "$data" | jq '[.statusCheckRollup[]? | select(.name != null)] | length')
local checks_passing=$(echo "$data" | jq '[.statusCheckRollup[]? | select(.conclusion == "SUCCESS")] | length')
local checks_failing=$(echo "$data" | jq '[.statusCheckRollup[]? | select(.conclusion == "FAILURE")] | length')
local reviews_approved=$(echo "$data" | jq '[.latestReviews[]? | select(.state == "APPROVED")] | length')
local reviews_changes=$(echo "$data" | jq '[.latestReviews[]? | select(.state == "CHANGES_REQUESTED")] | length')
local author=$(echo "$data" | jq -r '.author.login // "unknown"')
local auto_merge=$(echo "$data" | jq -r 'if .autoMergeRequest != null then "true" else "false" end')
# Calculate ages
local created=$(echo "$data" | jq -r '.createdAt')
local updated=$(echo "$data" | jq -r '.updatedAt')
# NOTE: date -d is GNU (Linux). On macOS use: date -j -f "%Y-%m-%dT%H:%M:%SZ" "$created" +%s
local pr_age_hours=$(( ($(date +%s) - $(date -d "$created" +%s)) / 3600 ))
local last_activity_hours=$(( ($(date +%s) - $(date -d "$updated" +%s)) / 3600 ))
# Build line protocol
local line="pr_observation,repo=${repo//\//_},pr=${pr},author=${author} "
line+="merge_state=\"${merge_state}\","
line+="mergeable=${mergeable},"
line+="is_draft=${is_draft},"
line+="checks_total=${checks_total}i,"
line+="checks_passing=${checks_passing}i,"
line+="checks_failing=${checks_failing}i,"
line+="reviews_approved=${reviews_approved}i,"
line+="reviews_changes_requested=${reviews_changes}i,"
line+="auto_merge_enabled=${auto_merge},"
line+="pr_age_hours=${pr_age_hours}i,"
line+="last_activity_hours=${last_activity_hours}i "
line+="${timestamp}"
# Write to InfluxDB
curl -s -XPOST "${INFLUX_HOST}/api/v2/write?org=${INFLUX_ORG}&bucket=${INFLUX_BUCKET}&precision=ns" \
-H "Authorization: Token ${INFLUX_TOKEN}" \
-H "Content-Type: text/plain" \
--data-raw "$line"
echo "Captured PR #${pr}"
}
# Capture all open PRs
for pr in $(gh pr list --repo "$REPO" --state open --json number --jq '.[].number'); do
capture_pr_to_influx "$REPO" "$pr"
done
Cron Schedule
# Add to crontab -e
# Capture every 15 minutes
*/15 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core >> /home/claude/logs/training-capture.log 2>&1
# Also capture PHP repos hourly (lower priority)
0 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core-php >> /home/claude/logs/training-capture.log 2>&1
0 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core-mcp >> /home/claude/logs/training-capture.log 2>&1
0 * * * * /home/claude/infra/tasks-agentic/training-data/capture-to-influx.sh dappcore/core-api >> /home/claude/logs/training-capture.log 2>&1
Recording Actions & Outcomes
When Orchestrator Takes Action
After any orchestration action, record it:
record_action() {
local repo=$1 pr=$2 action=$3 reason=$4
local timestamp=$(date +%s%N)
local line="action_taken,repo=${repo//\//_},pr=${pr} action=\"${action}\",reason=\"${reason}\" ${timestamp}"
curl -s -XPOST "${INFLUX_HOST}/api/v2/write?org=${INFLUX_ORG}&bucket=${INFLUX_BUCKET}&precision=ns" \
-H "Authorization: Token ${INFLUX_TOKEN}" \
--data-raw "$line"
}
# Examples:
record_action "dappcore/core" 315 "wait" "auto-merge enabled, all checks passing"
record_action "dappcore/core" 307 "request_review_fix" "unresolved threads, attempt 1"
record_action "dappcore/core" 319 "resolve_conflict" "conflict_attempts >= 2, manual resolution"
When PR Resolves
When a PR merges, closes, or is escalated:
record_outcome() {
local repo=$1 pr=$2 result=$3 detail=$4 resolution_hours=$5
local timestamp=$(date +%s%N)
local line="outcome,repo=${repo//\//_},pr=${pr} result=\"${result}\",detail=\"${detail}\",resolution_hours=${resolution_hours} ${timestamp}"
curl -s -XPOST "${INFLUX_HOST}/api/v2/write?org=${INFLUX_ORG}&bucket=${INFLUX_BUCKET}&precision=ns" \
-H "Authorization: Token ${INFLUX_TOKEN}" \
--data-raw "$line"
}
# Examples:
record_outcome "dappcore/core" 315 "success" "merged via auto-merge" 0.5
record_outcome "dappcore/core" 307 "success" "merged after 2 review fix requests" 4.2
record_outcome "dappcore/core" 291 "escalated" "conflict unresolvable after manual attempt" 72.0
Query Examples
Flux queries for analysis
// All observations for a PR over time
from(bucket: "training")
|> range(start: -7d)
|> filter(fn: (r) => r._measurement == "pr_observation")
|> filter(fn: (r) => r.pr == "315")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
// Action success rate by type
from(bucket: "training")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "outcome")
|> filter(fn: (r) => r._field == "result")
|> group(columns: ["action"])
|> count()
// Average resolution time by action type
from(bucket: "training")
|> range(start: -30d)
|> filter(fn: (r) => r._measurement == "outcome")
|> filter(fn: (r) => r._field == "resolution_hours")
|> group(columns: ["action"])
|> mean()
Export for Training
# Export to JSONL for model training
influx query '
from(bucket: "training")
|> range(start: -90d)
|> filter(fn: (r) => r._measurement == "pr_observation")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
' --raw | jq -c '.' > training-export.jsonl
Integration with issue-epic.md
The issue-epic flow should call record_action at each decision point:
- Step 3 (CI Gate) - After checking checks:
record_action $REPO $PR "wait" "CI running" - Step 5 (Fix Review) - After sending fix request:
record_action $REPO $PR "request_review_fix" "unresolved threads" - Step 7 (Update Branch) - After conflict request:
record_action $REPO $PR "request_conflict_fix" "merge conflict detected" - Step 8 (Merge) - When PR merges:
record_outcome $REPO $PR "success" "merged" $hours
Created: 2026-02-05 Part of: agentic pipeline training infrastructure