diff --git a/pkg/mcp/agentic/mirror.go b/pkg/mcp/agentic/mirror.go new file mode 100644 index 0000000..2318c07 --- /dev/null +++ b/pkg/mcp/agentic/mirror.go @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "fmt" + "os/exec" + "path/filepath" + + coreerr "forge.lthn.ai/core/go-log" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// MirrorInput controls Forge to GitHub mirror sync. +type MirrorInput struct { + Repo string `json:"repo,omitempty"` + DryRun bool `json:"dry_run,omitempty"` + MaxFiles int `json:"max_files,omitempty"` +} + +// MirrorOutput reports mirror sync results. +type MirrorOutput struct { + Success bool `json:"success"` + Synced []MirrorSync `json:"synced"` + Skipped []string `json:"skipped,omitempty"` + Count int `json:"count"` +} + +// MirrorSync records one repo sync attempt. +type MirrorSync struct { + Repo string `json:"repo"` + CommitsAhead int `json:"commits_ahead"` + FilesChanged int `json:"files_changed"` + PRURL string `json:"pr_url,omitempty"` + Pushed bool `json:"pushed"` + Skipped string `json:"skipped,omitempty"` +} + +func (s *PrepSubsystem) registerMirrorTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_mirror", + Description: "Mirror Forge repositories to GitHub and open a GitHub PR when there are commits ahead of the remote mirror.", + }, s.mirror) +} + +func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, input MirrorInput) (*mcp.CallToolResult, MirrorOutput, error) { + maxFiles := input.MaxFiles + if maxFiles <= 0 { + maxFiles = 50 + } + + basePath := repoRootFromCodePath(s.codePath) + repos := []string{} + if input.Repo != "" { + repos = []string{input.Repo} + } else { + repos = listLocalRepos(basePath) + } + + synced := make([]MirrorSync, 0, len(repos)) + skipped := make([]string, 0) + + for _, repo := range repos { + repoDir := filepath.Join(basePath, repo) + if !hasRemote(repoDir, "github") { + skipped = append(skipped, repo+": no github remote") + continue + } + + if _, err := exec.LookPath("git"); err != nil { + return nil, MirrorOutput{}, coreerr.E("mirror", "git CLI is not available", err) + } + + _, _ = gitOutput(repoDir, "fetch", "github") + ahead := commitsAhead(repoDir, "github/main", "HEAD") + if ahead <= 0 { + continue + } + + files := filesChanged(repoDir, "github/main", "HEAD") + sync := MirrorSync{ + Repo: repo, + CommitsAhead: ahead, + FilesChanged: files, + } + + if files > maxFiles { + sync.Skipped = fmt.Sprintf("%d files exceeds limit of %d", files, maxFiles) + synced = append(synced, sync) + continue + } + + if input.DryRun { + sync.Skipped = "dry run" + synced = append(synced, sync) + continue + } + + if err := ensureDevBranch(repoDir); err != nil { + sync.Skipped = err.Error() + synced = append(synced, sync) + continue + } + sync.Pushed = true + + prURL, err := createGitHubPR(ctx, repoDir, repo, ahead, files) + if err != nil { + sync.Skipped = err.Error() + } else { + sync.PRURL = prURL + } + + synced = append(synced, sync) + } + + return nil, MirrorOutput{ + Success: true, + Synced: synced, + Skipped: skipped, + Count: len(synced), + }, nil +} diff --git a/pkg/mcp/agentic/prep.go b/pkg/mcp/agentic/prep.go index 5c84a51..93b29ca 100644 --- a/pkg/mcp/agentic/prep.go +++ b/pkg/mcp/agentic/prep.go @@ -135,6 +135,9 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) { s.registerCreatePRTool(server) s.registerListPRsTool(server) s.registerEpicTool(server) + s.registerWatchTool(server) + s.registerReviewQueueTool(server) + s.registerMirrorTool(server) mcp.AddTool(server, &mcp.Tool{ Name: "agentic_scan", diff --git a/pkg/mcp/agentic/repo_helpers.go b/pkg/mcp/agentic/repo_helpers.go new file mode 100644 index 0000000..c4ee4e8 --- /dev/null +++ b/pkg/mcp/agentic/repo_helpers.go @@ -0,0 +1,209 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "encoding/json" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "strings" + "time" + + coreerr "forge.lthn.ai/core/go-log" +) + +func listLocalRepos(basePath string) []string { + entries, err := os.ReadDir(basePath) + if err != nil { + return nil + } + + repos := make([]string, 0, len(entries)) + for _, entry := range entries { + if entry.IsDir() { + repos = append(repos, entry.Name()) + } + } + return repos +} + +func hasRemote(repoDir, remote string) bool { + cmd := exec.Command("git", "remote", "get-url", remote) + cmd.Dir = repoDir + if out, err := cmd.Output(); err == nil { + return strings.TrimSpace(string(out)) != "" + } + return false +} + +func commitsAhead(repoDir, baseRef, headRef string) int { + cmd := exec.Command("git", "rev-list", "--count", baseRef+".."+headRef) + cmd.Dir = repoDir + out, err := cmd.Output() + if err != nil { + return 0 + } + + count, err := parsePositiveInt(strings.TrimSpace(string(out))) + if err != nil { + return 0 + } + return count +} + +func filesChanged(repoDir, baseRef, headRef string) int { + cmd := exec.Command("git", "diff", "--name-only", baseRef+".."+headRef) + cmd.Dir = repoDir + out, err := cmd.Output() + if err != nil { + return 0 + } + + count := 0 + for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { + if strings.TrimSpace(line) != "" { + count++ + } + } + return count +} + +func gitOutput(repoDir string, args ...string) (string, error) { + cmd := exec.Command("git", args...) + cmd.Dir = repoDir + out, err := cmd.CombinedOutput() + if err != nil { + return "", coreerr.E("gitOutput", string(out), err) + } + return strings.TrimSpace(string(out)), nil +} + +func parsePositiveInt(value string) (int, error) { + value = strings.TrimSpace(value) + if value == "" { + return 0, coreerr.E("parsePositiveInt", "empty value", nil) + } + n := 0 + for _, r := range value { + if r < '0' || r > '9' { + return 0, coreerr.E("parsePositiveInt", "value contains non-numeric characters", nil) + } + n = n*10 + int(r-'0') + } + return n, nil +} + +func readGitHubPRURL(repoDir string) (string, error) { + cmd := exec.Command("gh", "pr", "list", "--head", "dev", "--state", "open", "--json", "url", "--limit", "1") + cmd.Dir = repoDir + out, err := cmd.Output() + if err != nil { + return "", err + } + + var rows []struct { + URL string `json:"url"` + } + if err := json.Unmarshal(out, &rows); err != nil { + return "", err + } + if len(rows) == 0 { + return "", nil + } + return rows[0].URL, nil +} + +func createGitHubPR(ctx context.Context, repoDir, repo string, commits, files int) (string, error) { + if _, err := exec.LookPath("gh"); err != nil { + return "", coreerr.E("createGitHubPR", "gh CLI is not available", err) + } + + if url, err := readGitHubPRURL(repoDir); err == nil && url != "" { + return url, nil + } + + body := "## Forge -> GitHub Sync\n\n" + body += "**Commits:** " + itoa(commits) + "\n" + body += "**Files changed:** " + itoa(files) + "\n\n" + body += "Automated sync from Forge (forge.lthn.ai) to GitHub mirror.\n" + body += "Review with CodeRabbit before merging.\n\n" + body += "---\n" + body += "Co-Authored-By: Virgil " + + title := "[sync] " + repo + ": " + itoa(commits) + " commits, " + itoa(files) + " files" + + cmd := exec.CommandContext(ctx, "gh", "pr", "create", + "--head", "dev", + "--base", "main", + "--title", title, + "--body", body, + ) + cmd.Dir = repoDir + out, err := cmd.CombinedOutput() + if err != nil { + return "", coreerr.E("createGitHubPR", string(out), err) + } + + lines := strings.Split(strings.TrimSpace(string(out)), "\n") + if len(lines) == 0 { + return "", nil + } + return strings.TrimSpace(lines[len(lines)-1]), nil +} + +func ensureDevBranch(repoDir string) error { + cmd := exec.Command("git", "push", "github", "HEAD:refs/heads/dev", "--force") + cmd.Dir = repoDir + out, err := cmd.CombinedOutput() + if err != nil { + return coreerr.E("ensureDevBranch", string(out), err) + } + return nil +} + +func reviewerCommand(ctx context.Context, repoDir, reviewer string) *exec.Cmd { + switch reviewer { + case "coderabbit": + return exec.CommandContext(ctx, "coderabbit", "review") + case "codex": + return exec.CommandContext(ctx, "codex", "review") + case "both": + return exec.CommandContext(ctx, "coderabbit", "review") + default: + return exec.CommandContext(ctx, reviewer) + } +} + +func itoa(value int) string { + return strconv.Itoa(value) +} + +func parseRetryAfter(detail string) time.Duration { + re := regexp.MustCompile(`(?i)(\d+)\s*(minute|minutes|hour|hours|second|seconds)`) + match := re.FindStringSubmatch(detail) + if len(match) != 3 { + return 5 * time.Minute + } + + n, err := strconv.Atoi(match[1]) + if err != nil || n <= 0 { + return 5 * time.Minute + } + + switch strings.ToLower(match[2]) { + case "hour", "hours": + return time.Duration(n) * time.Hour + case "second", "seconds": + return time.Duration(n) * time.Second + default: + return time.Duration(n) * time.Minute + } +} + +func repoRootFromCodePath(codePath string) string { + return filepath.Join(codePath, "core") +} diff --git a/pkg/mcp/agentic/review_queue.go b/pkg/mcp/agentic/review_queue.go new file mode 100644 index 0000000..292501e --- /dev/null +++ b/pkg/mcp/agentic/review_queue.go @@ -0,0 +1,271 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "time" + + coreio "forge.lthn.ai/core/go-io" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// ReviewQueueInput controls the review queue runner. +type ReviewQueueInput struct { + Limit int `json:"limit,omitempty"` + Reviewer string `json:"reviewer,omitempty"` + DryRun bool `json:"dry_run,omitempty"` + LocalOnly bool `json:"local_only,omitempty"` +} + +// ReviewQueueOutput reports what happened. +type ReviewQueueOutput struct { + Success bool `json:"success"` + Processed []ReviewResult `json:"processed"` + Skipped []string `json:"skipped,omitempty"` + RateLimit *RateLimitInfo `json:"rate_limit,omitempty"` +} + +// ReviewResult is the outcome of reviewing one repo. +type ReviewResult struct { + Repo string `json:"repo"` + Verdict string `json:"verdict"` + Findings int `json:"findings"` + Action string `json:"action"` + Detail string `json:"detail,omitempty"` +} + +// RateLimitInfo tracks review rate limit state. +type RateLimitInfo struct { + Limited bool `json:"limited"` + RetryAt time.Time `json:"retry_at,omitempty"` + Message string `json:"message,omitempty"` +} + +func reviewQueueHomeDir() string { + if home := os.Getenv("DIR_HOME"); home != "" { + return home + } + home, _ := os.UserHomeDir() + return home +} + +func (s *PrepSubsystem) registerReviewQueueTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_review_queue", + Description: "Process repositories that are ahead of the GitHub mirror and summarise review findings.", + }, s.reviewQueue) +} + +func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, input ReviewQueueInput) (*mcp.CallToolResult, ReviewQueueOutput, error) { + limit := input.Limit + if limit <= 0 { + limit = 4 + } + + basePath := repoRootFromCodePath(s.codePath) + candidates := s.findReviewCandidates(basePath) + if len(candidates) == 0 { + return nil, ReviewQueueOutput{Success: true, Processed: []ReviewResult{}}, nil + } + + processed := make([]ReviewResult, 0, len(candidates)) + skipped := make([]string, 0) + var rateInfo *RateLimitInfo + + for _, repo := range candidates { + if len(processed) >= limit { + skipped = append(skipped, repo+" (limit reached)") + continue + } + + if rateInfo != nil && rateInfo.Limited && time.Now().Before(rateInfo.RetryAt) { + skipped = append(skipped, repo+" (rate limited)") + continue + } + + repoDir := filepath.Join(basePath, repo) + reviewer := input.Reviewer + if reviewer == "" { + reviewer = "coderabbit" + } + + result := s.reviewRepo(ctx, repoDir, repo, reviewer, input.DryRun, input.LocalOnly) + if result.Verdict == "rate_limited" { + retryAfter := parseRetryAfter(result.Detail) + rateInfo = &RateLimitInfo{ + Limited: true, + RetryAt: time.Now().Add(retryAfter), + Message: result.Detail, + } + skipped = append(skipped, repo+" (rate limited)") + continue + } + + processed = append(processed, result) + } + + if rateInfo != nil { + s.saveRateLimitState(rateInfo) + } + + return nil, ReviewQueueOutput{ + Success: true, + Processed: processed, + Skipped: skipped, + RateLimit: rateInfo, + }, nil +} + +func (s *PrepSubsystem) findReviewCandidates(basePath string) []string { + entries, err := os.ReadDir(basePath) + if err != nil { + return nil + } + + candidates := make([]string, 0, len(entries)) + for _, entry := range entries { + if !entry.IsDir() { + continue + } + repoDir := filepath.Join(basePath, entry.Name()) + if !hasRemote(repoDir, "github") { + continue + } + if commitsAhead(repoDir, "github/main", "HEAD") <= 0 { + continue + } + candidates = append(candidates, entry.Name()) + } + return candidates +} + +func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer string, dryRun, localOnly bool) ReviewResult { + result := ReviewResult{Repo: repo} + + if rl := s.loadRateLimitState(); rl != nil && rl.Limited && time.Now().Before(rl.RetryAt) { + result.Verdict = "rate_limited" + result.Detail = fmt.Sprintf("retry after %s", rl.RetryAt.Format(time.RFC3339)) + return result + } + + cmd := reviewerCommand(ctx, repoDir, reviewer) + cmd.Dir = repoDir + out, err := cmd.CombinedOutput() + output := strings.TrimSpace(string(out)) + + if strings.Contains(strings.ToLower(output), "rate limit") { + result.Verdict = "rate_limited" + result.Detail = output + return result + } + + if err != nil && !strings.Contains(output, "No findings") && !strings.Contains(output, "no issues") { + result.Verdict = "error" + if output != "" { + result.Detail = output + } else { + result.Detail = err.Error() + } + return result + } + + s.storeReviewOutput(repoDir, repo, reviewer, output) + result.Findings = countFindingHints(output) + + if strings.Contains(output, "No findings") || strings.Contains(output, "no issues") || strings.Contains(output, "LGTM") { + result.Verdict = "clean" + if dryRun { + result.Action = "skipped (dry run)" + return result + } + if localOnly { + result.Action = "local only" + return result + } + + if url, err := readGitHubPRURL(repoDir); err == nil && url != "" { + mergeCmd := exec.CommandContext(ctx, "gh", "pr", "merge", "--auto", "--squash", "--delete-branch") + mergeCmd.Dir = repoDir + if mergeOut, err := mergeCmd.CombinedOutput(); err == nil { + result.Action = "merged" + result.Detail = strings.TrimSpace(string(mergeOut)) + return result + } + } + + result.Action = "waiting" + return result + } + + result.Verdict = "findings" + if dryRun { + result.Action = "skipped (dry run)" + return result + } + + result.Action = "waiting" + return result +} + +func (s *PrepSubsystem) storeReviewOutput(repoDir, repo, reviewer, output string) { + home := reviewQueueHomeDir() + dataDir := filepath.Join(home, ".core", "training", "reviews") + if err := coreio.Local.EnsureDir(dataDir); err != nil { + return + } + + payload := map[string]string{ + "repo": repo, + "reviewer": reviewer, + "output": output, + "source": repoDir, + } + data, err := json.MarshalIndent(payload, "", " ") + if err != nil { + return + } + + name := fmt.Sprintf("%s-%s-%d.json", repo, reviewer, time.Now().Unix()) + _ = coreio.Local.Write(filepath.Join(dataDir, name), string(data)) +} + +func (s *PrepSubsystem) saveRateLimitState(info *RateLimitInfo) { + home := reviewQueueHomeDir() + path := filepath.Join(home, ".core", "coderabbit-ratelimit.json") + data, err := json.Marshal(info) + if err != nil { + return + } + _ = coreio.Local.Write(path, string(data)) +} + +func (s *PrepSubsystem) loadRateLimitState() *RateLimitInfo { + home := reviewQueueHomeDir() + path := filepath.Join(home, ".core", "coderabbit-ratelimit.json") + data, err := coreio.Local.Read(path) + if err != nil { + return nil + } + + var info RateLimitInfo + if err := json.Unmarshal([]byte(data), &info); err != nil { + return nil + } + if !info.Limited { + return nil + } + return &info +} + +func countFindingHints(output string) int { + re := regexp.MustCompile(`(?m)[^ \t\n\r]+\.(?:go|php|ts|tsx|js|jsx|py|rb|java|cs|cpp|cxx|cc|md):\d+`) + return len(re.FindAllString(output, -1)) +} diff --git a/pkg/mcp/agentic/watch.go b/pkg/mcp/agentic/watch.go new file mode 100644 index 0000000..301f66c --- /dev/null +++ b/pkg/mcp/agentic/watch.go @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "path/filepath" + "time" + + coreerr "forge.lthn.ai/core/go-log" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// WatchInput is the input for agentic_watch. +type WatchInput struct { + Workspaces []string `json:"workspaces,omitempty"` + PollInterval int `json:"poll_interval,omitempty"` + Timeout int `json:"timeout,omitempty"` +} + +// WatchOutput is the result of watching one or more workspaces. +type WatchOutput struct { + Success bool `json:"success"` + Completed []WatchResult `json:"completed"` + Failed []WatchResult `json:"failed,omitempty"` + Duration string `json:"duration"` +} + +// WatchResult describes one workspace result. +type WatchResult struct { + Workspace string `json:"workspace"` + Agent string `json:"agent"` + Repo string `json:"repo"` + Status string `json:"status"` + PRURL string `json:"pr_url,omitempty"` +} + +func (s *PrepSubsystem) registerWatchTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_watch", + Description: "Watch running or queued agent workspaces until they finish and return a completion summary.", + }, s.watch) +} + +func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, input WatchInput) (*mcp.CallToolResult, WatchOutput, error) { + pollInterval := time.Duration(input.PollInterval) * time.Second + if pollInterval <= 0 { + pollInterval = 5 * time.Second + } + + timeout := time.Duration(input.Timeout) * time.Second + if timeout <= 0 { + timeout = 10 * time.Minute + } + + start := time.Now() + deadline := start.Add(timeout) + + targets := input.Workspaces + if len(targets) == 0 { + targets = s.findActiveWorkspaces() + } + + if len(targets) == 0 { + return nil, WatchOutput{Success: true, Duration: "0s"}, nil + } + + remaining := make(map[string]struct{}, len(targets)) + for _, workspace := range targets { + remaining[workspace] = struct{}{} + } + + completed := make([]WatchResult, 0, len(targets)) + failed := make([]WatchResult, 0) + + for len(remaining) > 0 { + if time.Now().After(deadline) { + for workspace := range remaining { + failed = append(failed, WatchResult{ + Workspace: workspace, + Status: "timeout", + }) + } + break + } + + select { + case <-ctx.Done(): + return nil, WatchOutput{}, coreerr.E("watch", "cancelled", ctx.Err()) + case <-time.After(pollInterval): + } + + _, statusOut, err := s.status(ctx, req, StatusInput{}) + if err != nil { + return nil, WatchOutput{}, coreerr.E("watch", "failed to refresh status", err) + } + + for _, info := range statusOut.Workspaces { + if _, ok := remaining[info.Name]; !ok { + continue + } + + switch info.Status { + case "completed", "merged", "ready-for-review": + completed = append(completed, WatchResult{ + Workspace: info.Name, + Agent: info.Agent, + Repo: info.Repo, + Status: info.Status, + }) + delete(remaining, info.Name) + case "failed", "blocked": + failed = append(failed, WatchResult{ + Workspace: info.Name, + Agent: info.Agent, + Repo: info.Repo, + Status: info.Status, + }) + delete(remaining, info.Name) + } + } + } + + return nil, WatchOutput{ + Success: len(failed) == 0, + Completed: completed, + Failed: failed, + Duration: time.Since(start).Round(time.Second).String(), + }, nil +} + +func (s *PrepSubsystem) findActiveWorkspaces() []string { + wsDirs := s.listWorkspaceDirs() + if len(wsDirs) == 0 { + return nil + } + + active := make([]string, 0, len(wsDirs)) + for _, wsDir := range wsDirs { + st, err := readStatus(wsDir) + if err != nil { + continue + } + switch st.Status { + case "running", "queued": + active = append(active, filepath.Base(wsDir)) + } + } + return active +} + +func (s *PrepSubsystem) resolveWorkspaceDir(name string) string { + if filepath.IsAbs(name) { + return name + } + return filepath.Join(s.workspaceRoot(), name) +}