feat(agentic): add watch, mirror, and review queue tools

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-02 12:16:17 +00:00
parent e40b05c900
commit e138af6635
5 changed files with 763 additions and 0 deletions

123
pkg/mcp/agentic/mirror.go Normal file
View file

@ -0,0 +1,123 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"fmt"
"os/exec"
"path/filepath"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// MirrorInput controls Forge to GitHub mirror sync.
type MirrorInput struct {
Repo string `json:"repo,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
MaxFiles int `json:"max_files,omitempty"`
}
// MirrorOutput reports mirror sync results.
type MirrorOutput struct {
Success bool `json:"success"`
Synced []MirrorSync `json:"synced"`
Skipped []string `json:"skipped,omitempty"`
Count int `json:"count"`
}
// MirrorSync records one repo sync attempt.
type MirrorSync struct {
Repo string `json:"repo"`
CommitsAhead int `json:"commits_ahead"`
FilesChanged int `json:"files_changed"`
PRURL string `json:"pr_url,omitempty"`
Pushed bool `json:"pushed"`
Skipped string `json:"skipped,omitempty"`
}
func (s *PrepSubsystem) registerMirrorTool(server *mcp.Server) {
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_mirror",
Description: "Mirror Forge repositories to GitHub and open a GitHub PR when there are commits ahead of the remote mirror.",
}, s.mirror)
}
func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, input MirrorInput) (*mcp.CallToolResult, MirrorOutput, error) {
maxFiles := input.MaxFiles
if maxFiles <= 0 {
maxFiles = 50
}
basePath := repoRootFromCodePath(s.codePath)
repos := []string{}
if input.Repo != "" {
repos = []string{input.Repo}
} else {
repos = listLocalRepos(basePath)
}
synced := make([]MirrorSync, 0, len(repos))
skipped := make([]string, 0)
for _, repo := range repos {
repoDir := filepath.Join(basePath, repo)
if !hasRemote(repoDir, "github") {
skipped = append(skipped, repo+": no github remote")
continue
}
if _, err := exec.LookPath("git"); err != nil {
return nil, MirrorOutput{}, coreerr.E("mirror", "git CLI is not available", err)
}
_, _ = gitOutput(repoDir, "fetch", "github")
ahead := commitsAhead(repoDir, "github/main", "HEAD")
if ahead <= 0 {
continue
}
files := filesChanged(repoDir, "github/main", "HEAD")
sync := MirrorSync{
Repo: repo,
CommitsAhead: ahead,
FilesChanged: files,
}
if files > maxFiles {
sync.Skipped = fmt.Sprintf("%d files exceeds limit of %d", files, maxFiles)
synced = append(synced, sync)
continue
}
if input.DryRun {
sync.Skipped = "dry run"
synced = append(synced, sync)
continue
}
if err := ensureDevBranch(repoDir); err != nil {
sync.Skipped = err.Error()
synced = append(synced, sync)
continue
}
sync.Pushed = true
prURL, err := createGitHubPR(ctx, repoDir, repo, ahead, files)
if err != nil {
sync.Skipped = err.Error()
} else {
sync.PRURL = prURL
}
synced = append(synced, sync)
}
return nil, MirrorOutput{
Success: true,
Synced: synced,
Skipped: skipped,
Count: len(synced),
}, nil
}

View file

@ -135,6 +135,9 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) {
s.registerCreatePRTool(server)
s.registerListPRsTool(server)
s.registerEpicTool(server)
s.registerWatchTool(server)
s.registerReviewQueueTool(server)
s.registerMirrorTool(server)
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_scan",

View file

@ -0,0 +1,209 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"encoding/json"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
coreerr "forge.lthn.ai/core/go-log"
)
func listLocalRepos(basePath string) []string {
entries, err := os.ReadDir(basePath)
if err != nil {
return nil
}
repos := make([]string, 0, len(entries))
for _, entry := range entries {
if entry.IsDir() {
repos = append(repos, entry.Name())
}
}
return repos
}
func hasRemote(repoDir, remote string) bool {
cmd := exec.Command("git", "remote", "get-url", remote)
cmd.Dir = repoDir
if out, err := cmd.Output(); err == nil {
return strings.TrimSpace(string(out)) != ""
}
return false
}
func commitsAhead(repoDir, baseRef, headRef string) int {
cmd := exec.Command("git", "rev-list", "--count", baseRef+".."+headRef)
cmd.Dir = repoDir
out, err := cmd.Output()
if err != nil {
return 0
}
count, err := parsePositiveInt(strings.TrimSpace(string(out)))
if err != nil {
return 0
}
return count
}
func filesChanged(repoDir, baseRef, headRef string) int {
cmd := exec.Command("git", "diff", "--name-only", baseRef+".."+headRef)
cmd.Dir = repoDir
out, err := cmd.Output()
if err != nil {
return 0
}
count := 0
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
if strings.TrimSpace(line) != "" {
count++
}
}
return count
}
func gitOutput(repoDir string, args ...string) (string, error) {
cmd := exec.Command("git", args...)
cmd.Dir = repoDir
out, err := cmd.CombinedOutput()
if err != nil {
return "", coreerr.E("gitOutput", string(out), err)
}
return strings.TrimSpace(string(out)), nil
}
func parsePositiveInt(value string) (int, error) {
value = strings.TrimSpace(value)
if value == "" {
return 0, coreerr.E("parsePositiveInt", "empty value", nil)
}
n := 0
for _, r := range value {
if r < '0' || r > '9' {
return 0, coreerr.E("parsePositiveInt", "value contains non-numeric characters", nil)
}
n = n*10 + int(r-'0')
}
return n, nil
}
func readGitHubPRURL(repoDir string) (string, error) {
cmd := exec.Command("gh", "pr", "list", "--head", "dev", "--state", "open", "--json", "url", "--limit", "1")
cmd.Dir = repoDir
out, err := cmd.Output()
if err != nil {
return "", err
}
var rows []struct {
URL string `json:"url"`
}
if err := json.Unmarshal(out, &rows); err != nil {
return "", err
}
if len(rows) == 0 {
return "", nil
}
return rows[0].URL, nil
}
func createGitHubPR(ctx context.Context, repoDir, repo string, commits, files int) (string, error) {
if _, err := exec.LookPath("gh"); err != nil {
return "", coreerr.E("createGitHubPR", "gh CLI is not available", err)
}
if url, err := readGitHubPRURL(repoDir); err == nil && url != "" {
return url, nil
}
body := "## Forge -> GitHub Sync\n\n"
body += "**Commits:** " + itoa(commits) + "\n"
body += "**Files changed:** " + itoa(files) + "\n\n"
body += "Automated sync from Forge (forge.lthn.ai) to GitHub mirror.\n"
body += "Review with CodeRabbit before merging.\n\n"
body += "---\n"
body += "Co-Authored-By: Virgil <virgil@lethean.io>"
title := "[sync] " + repo + ": " + itoa(commits) + " commits, " + itoa(files) + " files"
cmd := exec.CommandContext(ctx, "gh", "pr", "create",
"--head", "dev",
"--base", "main",
"--title", title,
"--body", body,
)
cmd.Dir = repoDir
out, err := cmd.CombinedOutput()
if err != nil {
return "", coreerr.E("createGitHubPR", string(out), err)
}
lines := strings.Split(strings.TrimSpace(string(out)), "\n")
if len(lines) == 0 {
return "", nil
}
return strings.TrimSpace(lines[len(lines)-1]), nil
}
func ensureDevBranch(repoDir string) error {
cmd := exec.Command("git", "push", "github", "HEAD:refs/heads/dev", "--force")
cmd.Dir = repoDir
out, err := cmd.CombinedOutput()
if err != nil {
return coreerr.E("ensureDevBranch", string(out), err)
}
return nil
}
func reviewerCommand(ctx context.Context, repoDir, reviewer string) *exec.Cmd {
switch reviewer {
case "coderabbit":
return exec.CommandContext(ctx, "coderabbit", "review")
case "codex":
return exec.CommandContext(ctx, "codex", "review")
case "both":
return exec.CommandContext(ctx, "coderabbit", "review")
default:
return exec.CommandContext(ctx, reviewer)
}
}
func itoa(value int) string {
return strconv.Itoa(value)
}
func parseRetryAfter(detail string) time.Duration {
re := regexp.MustCompile(`(?i)(\d+)\s*(minute|minutes|hour|hours|second|seconds)`)
match := re.FindStringSubmatch(detail)
if len(match) != 3 {
return 5 * time.Minute
}
n, err := strconv.Atoi(match[1])
if err != nil || n <= 0 {
return 5 * time.Minute
}
switch strings.ToLower(match[2]) {
case "hour", "hours":
return time.Duration(n) * time.Hour
case "second", "seconds":
return time.Duration(n) * time.Second
default:
return time.Duration(n) * time.Minute
}
}
func repoRootFromCodePath(codePath string) string {
return filepath.Join(codePath, "core")
}

View file

@ -0,0 +1,271 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"time"
coreio "forge.lthn.ai/core/go-io"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// ReviewQueueInput controls the review queue runner.
type ReviewQueueInput struct {
Limit int `json:"limit,omitempty"`
Reviewer string `json:"reviewer,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
LocalOnly bool `json:"local_only,omitempty"`
}
// ReviewQueueOutput reports what happened.
type ReviewQueueOutput struct {
Success bool `json:"success"`
Processed []ReviewResult `json:"processed"`
Skipped []string `json:"skipped,omitempty"`
RateLimit *RateLimitInfo `json:"rate_limit,omitempty"`
}
// ReviewResult is the outcome of reviewing one repo.
type ReviewResult struct {
Repo string `json:"repo"`
Verdict string `json:"verdict"`
Findings int `json:"findings"`
Action string `json:"action"`
Detail string `json:"detail,omitempty"`
}
// RateLimitInfo tracks review rate limit state.
type RateLimitInfo struct {
Limited bool `json:"limited"`
RetryAt time.Time `json:"retry_at,omitempty"`
Message string `json:"message,omitempty"`
}
func reviewQueueHomeDir() string {
if home := os.Getenv("DIR_HOME"); home != "" {
return home
}
home, _ := os.UserHomeDir()
return home
}
func (s *PrepSubsystem) registerReviewQueueTool(server *mcp.Server) {
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_review_queue",
Description: "Process repositories that are ahead of the GitHub mirror and summarise review findings.",
}, s.reviewQueue)
}
func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, input ReviewQueueInput) (*mcp.CallToolResult, ReviewQueueOutput, error) {
limit := input.Limit
if limit <= 0 {
limit = 4
}
basePath := repoRootFromCodePath(s.codePath)
candidates := s.findReviewCandidates(basePath)
if len(candidates) == 0 {
return nil, ReviewQueueOutput{Success: true, Processed: []ReviewResult{}}, nil
}
processed := make([]ReviewResult, 0, len(candidates))
skipped := make([]string, 0)
var rateInfo *RateLimitInfo
for _, repo := range candidates {
if len(processed) >= limit {
skipped = append(skipped, repo+" (limit reached)")
continue
}
if rateInfo != nil && rateInfo.Limited && time.Now().Before(rateInfo.RetryAt) {
skipped = append(skipped, repo+" (rate limited)")
continue
}
repoDir := filepath.Join(basePath, repo)
reviewer := input.Reviewer
if reviewer == "" {
reviewer = "coderabbit"
}
result := s.reviewRepo(ctx, repoDir, repo, reviewer, input.DryRun, input.LocalOnly)
if result.Verdict == "rate_limited" {
retryAfter := parseRetryAfter(result.Detail)
rateInfo = &RateLimitInfo{
Limited: true,
RetryAt: time.Now().Add(retryAfter),
Message: result.Detail,
}
skipped = append(skipped, repo+" (rate limited)")
continue
}
processed = append(processed, result)
}
if rateInfo != nil {
s.saveRateLimitState(rateInfo)
}
return nil, ReviewQueueOutput{
Success: true,
Processed: processed,
Skipped: skipped,
RateLimit: rateInfo,
}, nil
}
func (s *PrepSubsystem) findReviewCandidates(basePath string) []string {
entries, err := os.ReadDir(basePath)
if err != nil {
return nil
}
candidates := make([]string, 0, len(entries))
for _, entry := range entries {
if !entry.IsDir() {
continue
}
repoDir := filepath.Join(basePath, entry.Name())
if !hasRemote(repoDir, "github") {
continue
}
if commitsAhead(repoDir, "github/main", "HEAD") <= 0 {
continue
}
candidates = append(candidates, entry.Name())
}
return candidates
}
func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer string, dryRun, localOnly bool) ReviewResult {
result := ReviewResult{Repo: repo}
if rl := s.loadRateLimitState(); rl != nil && rl.Limited && time.Now().Before(rl.RetryAt) {
result.Verdict = "rate_limited"
result.Detail = fmt.Sprintf("retry after %s", rl.RetryAt.Format(time.RFC3339))
return result
}
cmd := reviewerCommand(ctx, repoDir, reviewer)
cmd.Dir = repoDir
out, err := cmd.CombinedOutput()
output := strings.TrimSpace(string(out))
if strings.Contains(strings.ToLower(output), "rate limit") {
result.Verdict = "rate_limited"
result.Detail = output
return result
}
if err != nil && !strings.Contains(output, "No findings") && !strings.Contains(output, "no issues") {
result.Verdict = "error"
if output != "" {
result.Detail = output
} else {
result.Detail = err.Error()
}
return result
}
s.storeReviewOutput(repoDir, repo, reviewer, output)
result.Findings = countFindingHints(output)
if strings.Contains(output, "No findings") || strings.Contains(output, "no issues") || strings.Contains(output, "LGTM") {
result.Verdict = "clean"
if dryRun {
result.Action = "skipped (dry run)"
return result
}
if localOnly {
result.Action = "local only"
return result
}
if url, err := readGitHubPRURL(repoDir); err == nil && url != "" {
mergeCmd := exec.CommandContext(ctx, "gh", "pr", "merge", "--auto", "--squash", "--delete-branch")
mergeCmd.Dir = repoDir
if mergeOut, err := mergeCmd.CombinedOutput(); err == nil {
result.Action = "merged"
result.Detail = strings.TrimSpace(string(mergeOut))
return result
}
}
result.Action = "waiting"
return result
}
result.Verdict = "findings"
if dryRun {
result.Action = "skipped (dry run)"
return result
}
result.Action = "waiting"
return result
}
func (s *PrepSubsystem) storeReviewOutput(repoDir, repo, reviewer, output string) {
home := reviewQueueHomeDir()
dataDir := filepath.Join(home, ".core", "training", "reviews")
if err := coreio.Local.EnsureDir(dataDir); err != nil {
return
}
payload := map[string]string{
"repo": repo,
"reviewer": reviewer,
"output": output,
"source": repoDir,
}
data, err := json.MarshalIndent(payload, "", " ")
if err != nil {
return
}
name := fmt.Sprintf("%s-%s-%d.json", repo, reviewer, time.Now().Unix())
_ = coreio.Local.Write(filepath.Join(dataDir, name), string(data))
}
func (s *PrepSubsystem) saveRateLimitState(info *RateLimitInfo) {
home := reviewQueueHomeDir()
path := filepath.Join(home, ".core", "coderabbit-ratelimit.json")
data, err := json.Marshal(info)
if err != nil {
return
}
_ = coreio.Local.Write(path, string(data))
}
func (s *PrepSubsystem) loadRateLimitState() *RateLimitInfo {
home := reviewQueueHomeDir()
path := filepath.Join(home, ".core", "coderabbit-ratelimit.json")
data, err := coreio.Local.Read(path)
if err != nil {
return nil
}
var info RateLimitInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
return nil
}
if !info.Limited {
return nil
}
return &info
}
func countFindingHints(output string) int {
re := regexp.MustCompile(`(?m)[^ \t\n\r]+\.(?:go|php|ts|tsx|js|jsx|py|rb|java|cs|cpp|cxx|cc|md):\d+`)
return len(re.FindAllString(output, -1))
}

157
pkg/mcp/agentic/watch.go Normal file
View file

@ -0,0 +1,157 @@
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"path/filepath"
"time"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// WatchInput is the input for agentic_watch.
type WatchInput struct {
Workspaces []string `json:"workspaces,omitempty"`
PollInterval int `json:"poll_interval,omitempty"`
Timeout int `json:"timeout,omitempty"`
}
// WatchOutput is the result of watching one or more workspaces.
type WatchOutput struct {
Success bool `json:"success"`
Completed []WatchResult `json:"completed"`
Failed []WatchResult `json:"failed,omitempty"`
Duration string `json:"duration"`
}
// WatchResult describes one workspace result.
type WatchResult struct {
Workspace string `json:"workspace"`
Agent string `json:"agent"`
Repo string `json:"repo"`
Status string `json:"status"`
PRURL string `json:"pr_url,omitempty"`
}
func (s *PrepSubsystem) registerWatchTool(server *mcp.Server) {
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_watch",
Description: "Watch running or queued agent workspaces until they finish and return a completion summary.",
}, s.watch)
}
func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, input WatchInput) (*mcp.CallToolResult, WatchOutput, error) {
pollInterval := time.Duration(input.PollInterval) * time.Second
if pollInterval <= 0 {
pollInterval = 5 * time.Second
}
timeout := time.Duration(input.Timeout) * time.Second
if timeout <= 0 {
timeout = 10 * time.Minute
}
start := time.Now()
deadline := start.Add(timeout)
targets := input.Workspaces
if len(targets) == 0 {
targets = s.findActiveWorkspaces()
}
if len(targets) == 0 {
return nil, WatchOutput{Success: true, Duration: "0s"}, nil
}
remaining := make(map[string]struct{}, len(targets))
for _, workspace := range targets {
remaining[workspace] = struct{}{}
}
completed := make([]WatchResult, 0, len(targets))
failed := make([]WatchResult, 0)
for len(remaining) > 0 {
if time.Now().After(deadline) {
for workspace := range remaining {
failed = append(failed, WatchResult{
Workspace: workspace,
Status: "timeout",
})
}
break
}
select {
case <-ctx.Done():
return nil, WatchOutput{}, coreerr.E("watch", "cancelled", ctx.Err())
case <-time.After(pollInterval):
}
_, statusOut, err := s.status(ctx, req, StatusInput{})
if err != nil {
return nil, WatchOutput{}, coreerr.E("watch", "failed to refresh status", err)
}
for _, info := range statusOut.Workspaces {
if _, ok := remaining[info.Name]; !ok {
continue
}
switch info.Status {
case "completed", "merged", "ready-for-review":
completed = append(completed, WatchResult{
Workspace: info.Name,
Agent: info.Agent,
Repo: info.Repo,
Status: info.Status,
})
delete(remaining, info.Name)
case "failed", "blocked":
failed = append(failed, WatchResult{
Workspace: info.Name,
Agent: info.Agent,
Repo: info.Repo,
Status: info.Status,
})
delete(remaining, info.Name)
}
}
}
return nil, WatchOutput{
Success: len(failed) == 0,
Completed: completed,
Failed: failed,
Duration: time.Since(start).Round(time.Second).String(),
}, nil
}
func (s *PrepSubsystem) findActiveWorkspaces() []string {
wsDirs := s.listWorkspaceDirs()
if len(wsDirs) == 0 {
return nil
}
active := make([]string, 0, len(wsDirs))
for _, wsDir := range wsDirs {
st, err := readStatus(wsDir)
if err != nil {
continue
}
switch st.Status {
case "running", "queued":
active = append(active, filepath.Base(wsDir))
}
}
return active
}
func (s *PrepSubsystem) resolveWorkspaceDir(name string) string {
if filepath.IsAbs(name) {
return name
}
return filepath.Join(s.workspaceRoot(), name)
}