diff --git a/CODEX.md b/CODEX.md new file mode 100644 index 0000000..01fd497 --- /dev/null +++ b/CODEX.md @@ -0,0 +1,56 @@ +# CODEX.md + +Instructions for OpenAI Codex when working in the Core ecosystem. + +## MCP Tools Available + +You have access to core-agent MCP tools. Use them: + +- `brain_recall` — Search OpenBrain for context about any package, pattern, or decision +- `brain_remember` — Store what you learn for other agents (Claude, Gemini, future LEM) +- `agentic_dispatch` — Dispatch tasks to other agents +- `agentic_status` — Check agent workspace status + +**ALWAYS `brain_remember` significant findings** — your deep analysis of package internals, error patterns, security observations. This builds the shared knowledge base. + +## Core Ecosystem Conventions + +### Go Packages (forge.lthn.ai/core/*) + +- **Error handling**: `coreerr.E("pkg.Method", "what failed", err)` from `go-log`. NEVER `fmt.Errorf` or `errors.New`. + - Import as: `coreerr "forge.lthn.ai/core/go-log"` + - Always 3 args: operation, message, cause (use `nil` if no cause) + - `coreerr.E` returns `*log.Err` which implements `error` and `Unwrap()` + +- **File I/O**: `coreio.Local.Read/Write/Delete/EnsureDir` from `go-io`. NEVER `os.ReadFile/WriteFile/MkdirAll`. + - Import as: `coreio "forge.lthn.ai/core/go-io"` + - Security: go-io validates paths, prevents traversal + +- **Process management**: `go-process` for spawning external commands. Supports Timeout, GracePeriod, KillGroup. + +- **UK English**: colour, organisation, centre, initialise (never American spellings) + +- **Test naming**: `TestFoo_Good` (happy path), `TestFoo_Bad` (expected errors), `TestFoo_Ugly` (panics/edge cases) + +- **Commits**: `type(scope): description` with `Co-Authored-By: Virgil ` + +### PHP Packages (CorePHP) + +- **Actions pattern**: Single-purpose classes with `use Action` trait, static `::run()` helper +- **Tenant isolation**: `BelongsToWorkspace` trait on ALL models with tenant data +- **Strict types**: `declare(strict_types=1)` in every file +- **Testing**: Pest syntax, not PHPUnit + +## Review Focus Areas + +When reviewing code, prioritise: + +1. **Security**: Path traversal, injection, hardcoded secrets, unsafe input +2. **Error handling**: coreerr.E() convention compliance +3. **File I/O**: go-io usage, no raw os.* calls +4. **Tenant isolation**: BelongsToWorkspace on all tenant models (PHP) +5. **Test coverage**: Are critical paths tested? + +## Training Data + +Your reviews generate training data for LEM (our fine-tuned model). Be thorough and structured in your findings — every observation helps improve the next generation of reviews. diff --git a/claude/core/.claude-plugin/plugin.json b/claude/core/.claude-plugin/plugin.json index 4eb27ed..8f3dbfa 100644 --- a/claude/core/.claude-plugin/plugin.json +++ b/claude/core/.claude-plugin/plugin.json @@ -1,7 +1,7 @@ { "name": "core", "description": "Core agent platform — dispatch, watch, monitor, status, review, scan, messaging, PR automation, sandboxed agents", - "version": "0.7.0", + "version": "0.8.0", "author": { "name": "Lethean", "email": "hello@host.uk.com" @@ -15,5 +15,11 @@ "data-collection", "cryptocurrency", "archive" + ], + "hooks": [ + { + "event": "PostToolUse", + "script": "${CLAUDE_PLUGIN_ROOT}/scripts/check-notify.sh" + } ] } diff --git a/claude/core/scripts/check-notify.sh b/claude/core/scripts/check-notify.sh new file mode 100755 index 0000000..17774b4 --- /dev/null +++ b/claude/core/scripts/check-notify.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# Lightweight inbox notification check for PostToolUse hook. +# Reads a marker file written by the monitor subsystem. +# If marker exists, outputs the notification and removes the file. +# Zero API calls — just a file stat. + +NOTIFY_FILE="/tmp/claude-inbox-notify" + +if [ -f "$NOTIFY_FILE" ]; then + cat "$NOTIFY_FILE" + rm -f "$NOTIFY_FILE" +fi diff --git a/cmd/core-agent/main.go b/cmd/core-agent/main.go index 5ebd5b5..018b58d 100644 --- a/cmd/core-agent/main.go +++ b/cmd/core-agent/main.go @@ -1,7 +1,10 @@ package main import ( + "fmt" "log" + "os" + "path/filepath" "forge.lthn.ai/core/agent/pkg/agentic" "forge.lthn.ai/core/agent/pkg/brain" @@ -15,40 +18,104 @@ import ( func main() { if err := cli.Init(cli.Options{ AppName: "core-agent", - Version: "0.1.0", + Version: "0.2.0", }); err != nil { log.Fatal(err) } - mcpCmd := cli.NewCommand("mcp", "Start the MCP server on stdio", "", func(cmd *cli.Command, args []string) error { - // Initialise go-process so dispatch can spawn agents + // Shared setup for both mcp and serve commands + initServices := func() (*mcp.Service, *monitor.Subsystem, error) { c, err := core.New(core.WithName("process", process.NewService(process.Options{}))) if err != nil { - return cli.Wrap(err, "init core") + return nil, nil, cli.Wrap(err, "init core") } procSvc, err := core.ServiceFor[*process.Service](c, "process") if err != nil { - return cli.Wrap(err, "get process service") + return nil, nil, cli.Wrap(err, "get process service") } process.SetDefault(procSvc) mon := monitor.New() + prep := agentic.NewPrep() + prep.SetCompletionNotifier(mon) + mcpSvc, err := mcp.New( mcp.WithSubsystem(brain.NewDirect()), - mcp.WithSubsystem(agentic.NewPrep()), + mcp.WithSubsystem(prep), mcp.WithSubsystem(mon), ) if err != nil { - return cli.Wrap(err, "create MCP service") + return nil, nil, cli.Wrap(err, "create MCP service") } - // Start background monitor after MCP server is running + return mcpSvc, mon, nil + } + + // mcp — stdio transport (Claude Code integration) + mcpCmd := cli.NewCommand("mcp", "Start the MCP server on stdio", "", func(cmd *cli.Command, args []string) error { + mcpSvc, mon, err := initServices() + if err != nil { + return err + } + mon.Start(cmd.Context()) + return mcpSvc.Run(cmd.Context()) + }) + + // serve — persistent HTTP daemon (Charon, CI, cross-agent) + serveCmd := cli.NewCommand("serve", "Start as a persistent HTTP daemon", "", func(cmd *cli.Command, args []string) error { + mcpSvc, mon, err := initServices() + if err != nil { + return err + } + + // Determine address + addr := os.Getenv("MCP_HTTP_ADDR") + if addr == "" { + addr = "0.0.0.0:9101" + } + + // Determine health address + healthAddr := os.Getenv("HEALTH_ADDR") + if healthAddr == "" { + healthAddr = "0.0.0.0:9102" + } + + // Set up daemon with PID file, health check, and registry + home, _ := os.UserHomeDir() + pidFile := filepath.Join(home, ".core", "core-agent.pid") + + daemon := process.NewDaemon(process.DaemonOptions{ + PIDFile: pidFile, + HealthAddr: healthAddr, + Registry: process.DefaultRegistry(), + RegistryEntry: process.DaemonEntry{ + Code: "core", + Daemon: "agent", + Project: "core-agent", + Binary: "core-agent", + }, + }) + + if err := daemon.Start(); err != nil { + return cli.Wrap(err, "daemon start") + } + + // Start monitor mon.Start(cmd.Context()) + // Mark ready + daemon.SetReady(true) + fmt.Fprintf(os.Stderr, "core-agent serving on %s (health: %s, pid: %s)\n", addr, healthAddr, pidFile) + + // Set env so mcp.Run picks HTTP transport + os.Setenv("MCP_HTTP_ADDR", addr) + + // Run MCP server (blocks until context cancelled) return mcpSvc.Run(cmd.Context()) }) cli.RootCmd().AddCommand(mcpCmd) + cli.RootCmd().AddCommand(serveCmd) if err := cli.Execute(); err != nil { log.Fatal(err) diff --git a/config/agents.yaml b/config/agents.yaml index a3043f6..b1486d2 100644 --- a/config/agents.yaml +++ b/config/agents.yaml @@ -11,7 +11,7 @@ dispatch: # Per-agent concurrency limits (0 = unlimited) concurrency: - claude: 3 + claude: 5 gemini: 1 codex: 1 local: 1 @@ -41,6 +41,16 @@ rates: sustained_delay: 0 burst_window: 0 burst_delay: 0 + coderabbit: + reset_utc: "00:00" + daily_limit: 0 + # CodeRabbit enforces its own rate limits (~8/hour on Pro) + # The CLI returns retry-after time which we parse dynamically. + # These are conservative defaults for when we can't parse. + min_delay: 300 + sustained_delay: 450 + burst_window: 0 + burst_delay: 300 codex: reset_utc: "00:00" daily_limit: 0 diff --git a/docs/github-app-setup.md b/docs/github-app-setup.md new file mode 100644 index 0000000..dda45a3 --- /dev/null +++ b/docs/github-app-setup.md @@ -0,0 +1,63 @@ +# GitHub App Setup — dAppCore Agent + +## Create the App + +Go to: https://github.com/organizations/dAppCore/settings/apps/new + +### Basic Info +- **App name**: `core-agent` +- **Homepage URL**: `https://core.help` +- **Description**: Automated code sync, review, and CI/CD for the Core ecosystem + +### Webhook +- **Active**: Yes +- **Webhook URL**: `https://api.lthn.sh/api/github/webhook` (we'll build this endpoint) +- **Webhook secret**: (generate one — save it for the server) + +### Permissions + +#### Repository permissions: +- **Contents**: Read & write (push to dev branch) +- **Pull requests**: Read & write (create, merge, comment) +- **Issues**: Read & write (create from findings) +- **Checks**: Read & write (report build status) +- **Actions**: Read (check workflow status) +- **Metadata**: Read (always required) + +#### Organization permissions: +- None needed + +### Subscribe to events: +- Pull request +- Pull request review +- Push +- Check run +- Check suite + +### Where can this app be installed? +- **Only on this account** (dAppCore org only) + +## After Creation + +1. Note the **App ID** and **Client ID** +2. Generate a **Private Key** (.pem file) +3. Install the app on the dAppCore organization (all repos) +4. Save credentials: + ```bash + mkdir -p ~/.core/github-app + # Save the .pem file + cp ~/Downloads/core-agent.*.pem ~/.core/github-app/private-key.pem + # Save app ID + echo "APP_ID" > ~/.core/github-app/app-id + ``` + +## Webhook Handler + +The webhook handler at `api.lthn.sh/api/github/webhook` will: + +1. **pull_request_review (approved)** → auto-merge the PR +2. **pull_request_review (changes_requested)** → extract findings, dispatch fix agent +3. **push (to main)** → update Forge mirror (reverse sync) +4. **check_run (completed)** → report status back + +All events are also stored in uptelligence for the CodeRabbit KPI tracking. diff --git a/google/gemini-cli/GEMINI.md b/google/gemini-cli/GEMINI.md index d00a4bb..ab7b17f 100644 --- a/google/gemini-cli/GEMINI.md +++ b/google/gemini-cli/GEMINI.md @@ -1,20 +1,50 @@ -# Host UK Core Agent +# GEMINI.md -This extension provides tools and workflows for the Host UK development environment. -It helps with code review, verification, QA, and CI tasks. +Instructions for Google Gemini CLI when working in the Core ecosystem. -## Key Features +## MCP Tools Available -- **Core CLI Integration**: Enforces the use of `core` CLI (`host-uk/core` wrapper for go/php tools) to ensure consistency. -- **Auto-formatting**: Automatically formats Go and PHP code on edit. -- **Safety Checks**: Blocks destructive commands like `rm -rf` to prevent accidents. -- **Skills**: Provides data collection skills for various crypto/blockchain domains (e.g., Ledger papers, BitcoinTalk archives). -- **Codex Awareness**: Surfaces Codex guidance from `core-agent/codex/AGENTS.md`. -- **Ethics Modal**: Embeds the Axioms of Life ethics modal and strings safety guardrails. +You have access to core-agent MCP tools via the extension. Use them: -## Codex Commands +- `brain_recall` — Search OpenBrain for context about any package, pattern, or decision +- `brain_remember` — Store what you learn for other agents (Claude, Codex, future LEM) +- `agentic_dispatch` — Dispatch tasks to other agents +- `agentic_status` — Check agent workspace status -- `/codex:awareness` - Show full Codex guidance. -- `/codex:overview` - Show Codex plugin overview. -- `/codex:core-cli` - Show core CLI mapping. -- `/codex:safety` - Show safety guardrails. +**ALWAYS `brain_remember` significant findings** — your analysis of patterns, conventions, security observations. This builds the shared knowledge base that all agents read. + +## Core Ecosystem Conventions + +### Go Packages (forge.lthn.ai/core/*) + +- **Error handling**: `coreerr.E("pkg.Method", "what failed", err)` from `go-log`. NEVER `fmt.Errorf`. + - Import as: `coreerr "forge.lthn.ai/core/go-log"` + - Always 3 args: operation, message, cause (use `nil` if no cause) + +- **File I/O**: `coreio.Local.Read/Write/Delete/EnsureDir` from `go-io`. NEVER `os.ReadFile`. + - Import as: `coreio "forge.lthn.ai/core/go-io"` + +- **UK English**: colour, organisation, centre, initialise + +- **Test naming**: `TestFoo_Good`, `TestFoo_Bad`, `TestFoo_Ugly` + +- **Commits**: `type(scope): description` with `Co-Authored-By: Virgil ` + +### PHP Packages (CorePHP) + +- **Actions pattern**: `use Action` trait, static `::run()` helper +- **Tenant isolation**: `BelongsToWorkspace` on ALL tenant models +- **Strict types**: `declare(strict_types=1)` everywhere + +## Your Role + +You are best used for: +- **Fast batch operations** — convention sweeps, i18n, docs +- **Lightweight coding** — small fixes, boilerplate, test generation +- **Quick audits** — file scans, pattern matching + +Leave deep security review to Codex and complex architecture to Claude. + +## Training Data + +Your work generates training data for LEM. Be consistent with conventions — every file you touch should follow the patterns above perfectly. diff --git a/google/gemini-cli/gemini-extension.json b/google/gemini-cli/gemini-extension.json index aea6552..5a857b7 100644 --- a/google/gemini-cli/gemini-extension.json +++ b/google/gemini-cli/gemini-extension.json @@ -5,11 +5,8 @@ "contextFileName": "GEMINI.md", "mcpServers": { "core-agent": { - "command": "node", - "args": [ - "${extensionPath}/src/index.js" - ], - "cwd": "${extensionPath}" + "command": "/Users/snider/go/bin/core-agent", + "args": ["mcp"] } } } diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index b54429a..e324daa 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -67,12 +67,18 @@ func agentCommand(agent, prompt string) (string, []string, error) { } return "gemini", args, nil case "codex": + if model == "review" { + // Codex review mode — non-interactive code review + // Note: --base and prompt are mutually exclusive in codex CLI + return "codex", []string{"review", "--base", "HEAD~1"}, nil + } + // Codex agent mode — autonomous coding return "codex", []string{"--approval-mode", "full-auto", "-q", prompt}, nil case "claude": args := []string{ "-p", prompt, "--output-format", "text", - "--permission-mode", "bypassPermissions", + "--dangerously-skip-permissions", "--no-session-persistence", "--append-system-prompt", "SANDBOX: You are restricted to the current directory (src/) only. " + "Do NOT use absolute paths starting with /. Do NOT cd .. or navigate outside. " + @@ -82,6 +88,17 @@ func agentCommand(agent, prompt string) (string, []string, error) { args = append(args, "--model", model) } return "claude", args, nil + case "coderabbit": + args := []string{"review", "--plain", "--base", "HEAD~1"} + if model != "" { + // model variant can specify review type: all, committed, uncommitted + args = append(args, "--type", model) + } + if prompt != "" { + // Pass CLAUDE.md or other config as additional instructions + args = append(args, "--config", "CLAUDE.md") + } + return "coderabbit", args, nil case "local": home, _ := os.UserHomeDir() script := filepath.Join(home, "Code", "core", "agent", "scripts", "local-agent.sh") @@ -94,6 +111,9 @@ func agentCommand(agent, prompt string) (string, []string, error) { // spawnAgent launches an agent process via go-process and returns the PID. // Output is captured via pipes and written to the log file on completion. // The background goroutine handles status updates, findings ingestion, and queue drain. +// +// For CodeRabbit agents, no process is spawned — instead the code is pushed +// to GitHub and a PR is created/marked ready for review. func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir, srcDir string) (int, string, error) { command, args, err := agentCommand(agent, prompt) if err != nil { @@ -103,65 +123,93 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir, srcDir string) (int, st outputFile := filepath.Join(wsDir, fmt.Sprintf("agent-%s.log", agent)) proc, err := process.StartWithOptions(context.Background(), process.RunOptions{ - Command: command, - Args: args, - Dir: srcDir, - Env: []string{"TERM=dumb", "NO_COLOR=1", "CI=true"}, - Detach: true, + Command: command, + Args: args, + Dir: srcDir, + Env: []string{"TERM=dumb", "NO_COLOR=1", "CI=true", "GOWORK=off"}, + Detach: true, + KillGroup: true, + Timeout: 30 * time.Minute, + GracePeriod: 10 * time.Second, }) if err != nil { return 0, "", coreerr.E("dispatch.spawnAgent", "failed to spawn "+agent, err) } + // Close stdin immediately — agents use -p mode, not interactive stdin. + // Without this, Claude CLI blocks waiting on the open pipe. + proc.CloseStdin() + pid := proc.Info().PID go func() { - // Wait for process exit with PID polling fallback. - // go-process Wait() can hang if child processes inherit pipes. - // Poll the PID every 5s — if the process is gone, force completion. - done := make(chan struct{}) - go func() { - proc.Wait() - close(done) - }() - + // Wait for process exit. go-process handles timeout and kill group. + // PID polling fallback in case pipes hang from inherited child processes. ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { - case <-done: - goto completed + case <-proc.Done(): + goto done case <-ticker.C: - // Check if main process is still alive - p, err := os.FindProcess(pid) - if err != nil { - goto completed - } - if err := p.Signal(syscall.Signal(0)); err != nil { - // Process is dead — force cleanup - goto completed + if err := syscall.Kill(pid, 0); err != nil { + goto done } } } - completed: + done: // Write captured output to log file if output := proc.Output(); output != "" { coreio.Local.Write(outputFile, output) } - // Update status to completed - if st, err := readStatus(wsDir); err == nil { - st.Status = "completed" - st.PID = 0 - writeStatus(wsDir, st) + // Determine final status: check exit code, BLOCKED.md, and output + finalStatus := "completed" + exitCode := proc.Info().ExitCode + procStatus := proc.Info().Status + + // Check for BLOCKED.md (agent is asking a question) + blockedPath := filepath.Join(wsDir, "src", "BLOCKED.md") + if blockedContent, err := coreio.Local.Read(blockedPath); err == nil && strings.TrimSpace(blockedContent) != "" { + finalStatus = "blocked" + if st, err := readStatus(wsDir); err == nil { + st.Status = "blocked" + st.Question = strings.TrimSpace(blockedContent) + st.PID = 0 + writeStatus(wsDir, st) + } + } else if exitCode != 0 || procStatus == "failed" || procStatus == "killed" { + finalStatus = "failed" + if st, err := readStatus(wsDir); err == nil { + st.Status = "failed" + st.PID = 0 + if exitCode != 0 { + st.Question = fmt.Sprintf("Agent exited with code %d", exitCode) + } + writeStatus(wsDir, st) + } + } else { + if st, err := readStatus(wsDir); err == nil { + st.Status = "completed" + st.PID = 0 + writeStatus(wsDir, st) + } } // Emit completion event emitCompletionEvent(agent, filepath.Base(wsDir)) - // Auto-create PR if agent made commits - s.autoCreatePR(wsDir) + // Notify monitor immediately (push to connected clients) + if s.onComplete != nil { + s.onComplete.Poke() + } + + // Auto-create PR if agent completed successfully, then verify and merge + if finalStatus == "completed" { + s.autoCreatePR(wsDir) + s.autoVerifyAndMerge(wsDir) + } // Ingest scan findings as issues s.ingestFindings(wsDir) diff --git a/pkg/agentic/mirror.go b/pkg/agentic/mirror.go new file mode 100644 index 0000000..6c226f3 --- /dev/null +++ b/pkg/agentic/mirror.go @@ -0,0 +1,278 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + coreerr "forge.lthn.ai/core/go-log" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// --- agentic_mirror tool --- + +// MirrorInput is the input for agentic_mirror. +type MirrorInput struct { + Repo string `json:"repo,omitempty"` // Specific repo, or empty for all + DryRun bool `json:"dry_run,omitempty"` // Preview without pushing + MaxFiles int `json:"max_files,omitempty"` // Max files per PR (default 50, CodeRabbit limit) +} + +// MirrorOutput is the output for agentic_mirror. +type MirrorOutput struct { + Success bool `json:"success"` + Synced []MirrorSync `json:"synced"` + Skipped []string `json:"skipped,omitempty"` + Count int `json:"count"` +} + +// MirrorSync records one repo sync. +type MirrorSync struct { + Repo string `json:"repo"` + CommitsAhead int `json:"commits_ahead"` + FilesChanged int `json:"files_changed"` + PRURL string `json:"pr_url,omitempty"` + Pushed bool `json:"pushed"` + Skipped string `json:"skipped,omitempty"` +} + +func (s *PrepSubsystem) registerMirrorTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_mirror", + Description: "Sync Forge repos to GitHub mirrors. Pushes Forge main to GitHub dev branch and creates a PR. Respects file count limits for CodeRabbit review.", + }, s.mirror) +} + +func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, input MirrorInput) (*mcp.CallToolResult, MirrorOutput, error) { + maxFiles := input.MaxFiles + if maxFiles <= 0 { + maxFiles = 50 + } + + basePath := s.codePath + if basePath == "" { + home, _ := os.UserHomeDir() + basePath = filepath.Join(home, "Code", "core") + } else { + basePath = filepath.Join(basePath, "core") + } + + // Build list of repos to sync + var repos []string + if input.Repo != "" { + repos = []string{input.Repo} + } else { + repos = s.listLocalRepos(basePath) + } + + var synced []MirrorSync + var skipped []string + + for _, repo := range repos { + repoDir := filepath.Join(basePath, repo) + + // Check if github remote exists + if !hasRemote(repoDir, "github") { + skipped = append(skipped, repo+": no github remote") + continue + } + + // Fetch github to get current state + fetchCmd := exec.CommandContext(ctx, "git", "fetch", "github") + fetchCmd.Dir = repoDir + fetchCmd.Run() + + // Check how far ahead we are + ahead := commitsAhead(repoDir, "github/main", "HEAD") + if ahead == 0 { + continue // Already in sync + } + + // Count files changed + files := filesChanged(repoDir, "github/main", "HEAD") + + sync := MirrorSync{ + Repo: repo, + CommitsAhead: ahead, + FilesChanged: files, + } + + // Skip if too many files for one PR + if files > maxFiles { + sync.Skipped = fmt.Sprintf("%d files exceeds limit of %d", files, maxFiles) + synced = append(synced, sync) + continue + } + + if input.DryRun { + sync.Skipped = "dry run" + synced = append(synced, sync) + continue + } + + // Ensure dev branch exists on GitHub + ensureDevBranch(repoDir) + + // Push local main to github dev + pushCmd := exec.CommandContext(ctx, "git", "push", "github", "HEAD:refs/heads/dev", "--force") + pushCmd.Dir = repoDir + if err := pushCmd.Run(); err != nil { + sync.Skipped = fmt.Sprintf("push failed: %v", err) + synced = append(synced, sync) + continue + } + sync.Pushed = true + + // Create PR: dev → main on GitHub + prURL, err := s.createGitHubPR(ctx, repoDir, repo, ahead, files) + if err != nil { + sync.Skipped = fmt.Sprintf("PR creation failed: %v", err) + } else { + sync.PRURL = prURL + } + + synced = append(synced, sync) + } + + return nil, MirrorOutput{ + Success: true, + Synced: synced, + Skipped: skipped, + Count: len(synced), + }, nil +} + +// createGitHubPR creates a PR from dev → main using the gh CLI. +func (s *PrepSubsystem) createGitHubPR(ctx context.Context, repoDir, repo string, commits, files int) (string, error) { + // Check if there's already an open PR from dev + checkCmd := exec.CommandContext(ctx, "gh", "pr", "list", "--head", "dev", "--state", "open", "--json", "url", "--limit", "1") + checkCmd.Dir = repoDir + out, err := checkCmd.Output() + if err == nil && strings.Contains(string(out), "url") { + // PR already exists — extract URL + // Format: [{"url":"https://..."}] + url := extractJSONField(string(out), "url") + if url != "" { + return url, nil + } + } + + // Build PR body + body := fmt.Sprintf("## Forge → GitHub Sync\n\n"+ + "**Commits:** %d\n"+ + "**Files changed:** %d\n\n"+ + "Automated sync from Forge (forge.lthn.ai) to GitHub mirror.\n"+ + "Review with CodeRabbit before merging.\n\n"+ + "---\n"+ + "Co-Authored-By: Virgil ", + commits, files) + + title := fmt.Sprintf("[sync] %s: %d commits, %d files", repo, commits, files) + + prCmd := exec.CommandContext(ctx, "gh", "pr", "create", + "--head", "dev", + "--base", "main", + "--title", title, + "--body", body, + ) + prCmd.Dir = repoDir + prOut, err := prCmd.CombinedOutput() + if err != nil { + return "", coreerr.E("createGitHubPR", string(prOut), err) + } + + // gh pr create outputs the PR URL on the last line + lines := strings.Split(strings.TrimSpace(string(prOut)), "\n") + if len(lines) > 0 { + return lines[len(lines)-1], nil + } + + return "", nil +} + +// ensureDevBranch creates the dev branch on GitHub if it doesn't exist. +func ensureDevBranch(repoDir string) { + // Try to push current main as dev — if dev exists this is a no-op (we force-push later) + cmd := exec.Command("git", "push", "github", "HEAD:refs/heads/dev", "--no-force") + cmd.Dir = repoDir + cmd.Run() // Ignore error — branch may already exist +} + +// hasRemote checks if a git remote exists. +func hasRemote(repoDir, name string) bool { + cmd := exec.Command("git", "remote", "get-url", name) + cmd.Dir = repoDir + return cmd.Run() == nil +} + +// commitsAhead returns how many commits HEAD is ahead of the ref. +func commitsAhead(repoDir, base, head string) int { + cmd := exec.Command("git", "rev-list", base+".."+head, "--count") + cmd.Dir = repoDir + out, err := cmd.Output() + if err != nil { + return 0 + } + var n int + fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &n) + return n +} + +// filesChanged returns the number of files changed between two refs. +func filesChanged(repoDir, base, head string) int { + cmd := exec.Command("git", "diff", "--name-only", base+".."+head) + cmd.Dir = repoDir + out, err := cmd.Output() + if err != nil { + return 0 + } + lines := strings.Split(strings.TrimSpace(string(out)), "\n") + if len(lines) == 1 && lines[0] == "" { + return 0 + } + return len(lines) +} + +// listLocalRepos returns repo names that exist as directories in basePath. +func (s *PrepSubsystem) listLocalRepos(basePath string) []string { + entries, err := os.ReadDir(basePath) + if err != nil { + return nil + } + var repos []string + for _, e := range entries { + if !e.IsDir() { + continue + } + // Must have a .git directory + if _, err := os.Stat(filepath.Join(basePath, e.Name(), ".git")); err == nil { + repos = append(repos, e.Name()) + } + } + return repos +} + +// extractJSONField extracts a simple string field from JSON array output. +func extractJSONField(jsonStr, field string) string { + // Quick and dirty — works for gh CLI output like [{"url":"https://..."}] + key := fmt.Sprintf(`"%s":"`, field) + idx := strings.Index(jsonStr, key) + if idx < 0 { + return "" + } + start := idx + len(key) + end := strings.Index(jsonStr[start:], `"`) + if end < 0 { + return "" + } + return jsonStr[start : start+end] +} + +// Ensure time is imported (used by other files in package). +var _ = time.Now diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 6350255..d9cac80 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -23,6 +23,12 @@ import ( "gopkg.in/yaml.v3" ) +// CompletionNotifier is called when an agent completes, to trigger +// immediate notifications to connected clients. +type CompletionNotifier interface { + Poke() +} + // PrepSubsystem provides agentic MCP tools. type PrepSubsystem struct { forgeURL string @@ -31,7 +37,8 @@ type PrepSubsystem struct { brainKey string specsPath string codePath string - client *http.Client + client *http.Client + onComplete CompletionNotifier } // NewPrep creates an agentic subsystem. @@ -61,6 +68,11 @@ func NewPrep() *PrepSubsystem { } } +// SetCompletionNotifier wires up the monitor for immediate push on agent completion. +func (s *PrepSubsystem) SetCompletionNotifier(n CompletionNotifier) { + s.onComplete = n +} + func envOr(key, fallback string) string { if v := os.Getenv(key); v != "" { return v @@ -84,6 +96,10 @@ func (s *PrepSubsystem) RegisterTools(server *mcp.Server) { s.registerCreatePRTool(server) s.registerListPRsTool(server) s.registerEpicTool(server) + s.registerMirrorTool(server) + s.registerRemoteDispatchTool(server) + s.registerRemoteStatusTool(server) + s.registerReviewQueueTool(server) mcp.AddTool(server, &mcp.Tool{ Name: "agentic_scan", @@ -273,6 +289,36 @@ Review all Go files in src/ for security issues: - Unsafe use of os/exec Report findings with severity (critical/high/medium/low) and file:line references. +` + case "verify": + prompt = `Read PERSONA.md if it exists — adopt that identity and approach. +Read CLAUDE.md for project conventions and context. + +You are verifying a pull request. The code in src/ contains changes on a feature branch. + +## Your Tasks + +1. **Run tests**: Execute the project's test suite (go test ./..., composer test, or npm test). Report results. +2. **Review diff**: Run ` + "`git diff origin/main..HEAD`" + ` to see all changes. Review for: + - Correctness: Does the code do what the commit messages say? + - Security: Path traversal, injection, hardcoded secrets, unsafe input handling + - Conventions: coreerr.E() not fmt.Errorf, go-io not os.ReadFile, UK English + - Test coverage: Are new functions tested? +3. **Verdict**: Write VERDICT.md with: + - PASS or FAIL (first line, nothing else) + - Summary of findings (if any) + - List of issues by severity (critical/high/medium/low) + +If PASS: the PR will be auto-merged. +If FAIL: your findings will be commented on the PR for the original agent to address. + +Be strict but fair. A missing test is medium. A security issue is critical. A typo is low. + +## SANDBOX BOUNDARY (HARD LIMIT) + +You are restricted to the current directory and its subdirectories ONLY. +- Do NOT use absolute paths +- Do NOT navigate outside this repository ` case "coding": prompt = `Read PERSONA.md if it exists — adopt that identity and approach. diff --git a/pkg/agentic/remote.go b/pkg/agentic/remote.go new file mode 100644 index 0000000..4a0db97 --- /dev/null +++ b/pkg/agentic/remote.go @@ -0,0 +1,202 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "strings" + "time" + + coreerr "forge.lthn.ai/core/go-log" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// --- agentic_dispatch_remote tool --- + +// RemoteDispatchInput dispatches a task to a remote core-agent over HTTP. +type RemoteDispatchInput struct { + Host string `json:"host"` // Remote agent host (e.g. "charon", "10.69.69.165:9101") + Repo string `json:"repo"` // Target repo + Task string `json:"task"` // What the agent should do + Agent string `json:"agent,omitempty"` // Agent type (default: claude:opus) + Template string `json:"template,omitempty"` // Prompt template + Persona string `json:"persona,omitempty"` // Persona slug + Org string `json:"org,omitempty"` // Forge org (default: core) + Variables map[string]string `json:"variables,omitempty"` // Template variables +} + +// RemoteDispatchOutput is the response from a remote dispatch. +type RemoteDispatchOutput struct { + Success bool `json:"success"` + Host string `json:"host"` + Repo string `json:"repo"` + Agent string `json:"agent"` + WorkspaceDir string `json:"workspace_dir,omitempty"` + PID int `json:"pid,omitempty"` + Error string `json:"error,omitempty"` +} + +func (s *PrepSubsystem) registerRemoteDispatchTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_dispatch_remote", + Description: "Dispatch a task to a remote core-agent (e.g. Charon). The remote agent preps a workspace and spawns the task locally on its hardware.", + }, s.dispatchRemote) +} + +func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolRequest, input RemoteDispatchInput) (*mcp.CallToolResult, RemoteDispatchOutput, error) { + if input.Host == "" { + return nil, RemoteDispatchOutput{}, coreerr.E("dispatchRemote", "host is required", nil) + } + if input.Repo == "" { + return nil, RemoteDispatchOutput{}, coreerr.E("dispatchRemote", "repo is required", nil) + } + if input.Task == "" { + return nil, RemoteDispatchOutput{}, coreerr.E("dispatchRemote", "task is required", nil) + } + + // Resolve host aliases + addr := resolveHost(input.Host) + + // Get auth token for remote agent + token := remoteToken(input.Host) + + // Build the MCP JSON-RPC call to agentic_dispatch on the remote + callParams := map[string]any{ + "repo": input.Repo, + "task": input.Task, + } + if input.Agent != "" { + callParams["agent"] = input.Agent + } + if input.Template != "" { + callParams["template"] = input.Template + } + if input.Persona != "" { + callParams["persona"] = input.Persona + } + if input.Org != "" { + callParams["org"] = input.Org + } + if len(input.Variables) > 0 { + callParams["variables"] = input.Variables + } + + rpcReq := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": map[string]any{ + "name": "agentic_dispatch", + "arguments": callParams, + }, + } + + url := fmt.Sprintf("http://%s/mcp", addr) + client := &http.Client{Timeout: 30 * time.Second} + + // Step 1: Initialize session + sessionID, err := mcpInitialize(ctx, client, url, token) + if err != nil { + return nil, RemoteDispatchOutput{ + Host: input.Host, + Error: fmt.Sprintf("init failed: %v", err), + }, coreerr.E("dispatchRemote", "MCP initialize failed", err) + } + + // Step 2: Call the tool + body, _ := json.Marshal(rpcReq) + result, err := mcpCall(ctx, client, url, token, sessionID, body) + if err != nil { + return nil, RemoteDispatchOutput{ + Host: input.Host, + Error: fmt.Sprintf("call failed: %v", err), + }, coreerr.E("dispatchRemote", "tool call failed", err) + } + + // Parse result + output := RemoteDispatchOutput{ + Success: true, + Host: input.Host, + Repo: input.Repo, + Agent: input.Agent, + } + + var rpcResp struct { + Result struct { + Content []struct { + Text string `json:"text"` + } `json:"content"` + } `json:"result"` + Error *struct { + Message string `json:"message"` + } `json:"error"` + } + if json.Unmarshal(result, &rpcResp) == nil { + if rpcResp.Error != nil { + output.Success = false + output.Error = rpcResp.Error.Message + } else if len(rpcResp.Result.Content) > 0 { + var dispatchOut DispatchOutput + if json.Unmarshal([]byte(rpcResp.Result.Content[0].Text), &dispatchOut) == nil { + output.WorkspaceDir = dispatchOut.WorkspaceDir + output.PID = dispatchOut.PID + output.Agent = dispatchOut.Agent + } + } + } + + return nil, output, nil +} + +// resolveHost maps friendly names to addresses. +func resolveHost(host string) string { + // Known hosts + aliases := map[string]string{ + "charon": "10.69.69.165:9101", + "cladius": "127.0.0.1:9101", + "local": "127.0.0.1:9101", + } + + if addr, ok := aliases[strings.ToLower(host)]; ok { + return addr + } + + // If no port specified, add default + if !strings.Contains(host, ":") { + return host + ":9101" + } + + return host +} + +// remoteToken gets the auth token for a remote agent. +func remoteToken(host string) string { + // Check environment first + envKey := fmt.Sprintf("AGENT_TOKEN_%s", strings.ToUpper(host)) + if token := os.Getenv(envKey); token != "" { + return token + } + + // Fallback to shared agent token + if token := os.Getenv("MCP_AUTH_TOKEN"); token != "" { + return token + } + + // Try reading from file + home, _ := os.UserHomeDir() + tokenFiles := []string{ + fmt.Sprintf("%s/.core/tokens/%s.token", home, strings.ToLower(host)), + fmt.Sprintf("%s/.core/agent-token", home), + } + for _, f := range tokenFiles { + if data, err := os.ReadFile(f); err == nil { + return strings.TrimSpace(string(data)) + } + } + + return "" +} diff --git a/pkg/agentic/remote_client.go b/pkg/agentic/remote_client.go new file mode 100644 index 0000000..d9c9058 --- /dev/null +++ b/pkg/agentic/remote_client.go @@ -0,0 +1,128 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + coreerr "forge.lthn.ai/core/go-log" +) + +// mcpInitialize performs the MCP initialize handshake over Streamable HTTP. +// Returns the session ID from the Mcp-Session-Id header. +func mcpInitialize(ctx context.Context, client *http.Client, url, token string) (string, error) { + initReq := map[string]any{ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": map[string]any{ + "protocolVersion": "2025-03-26", + "capabilities": map[string]any{}, + "clientInfo": map[string]any{ + "name": "core-agent-remote", + "version": "0.2.0", + }, + }, + } + + body, _ := json.Marshal(initReq) + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return "", coreerr.E("mcpInitialize", "create request", err) + } + setHeaders(req, token, "") + + resp, err := client.Do(req) + if err != nil { + return "", coreerr.E("mcpInitialize", "request failed", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return "", coreerr.E("mcpInitialize", fmt.Sprintf("HTTP %d", resp.StatusCode), nil) + } + + sessionID := resp.Header.Get("Mcp-Session-Id") + + // Drain the SSE response (we don't need the initialize result) + drainSSE(resp) + + // Send initialized notification + notif := map[string]any{ + "jsonrpc": "2.0", + "method": "notifications/initialized", + } + notifBody, _ := json.Marshal(notif) + + notifReq, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(notifBody)) + setHeaders(notifReq, token, sessionID) + + notifResp, err := client.Do(notifReq) + if err == nil { + notifResp.Body.Close() + } + + return sessionID, nil +} + +// mcpCall sends a JSON-RPC request and returns the parsed response. +// Handles the SSE response format (text/event-stream with data: lines). +func mcpCall(ctx context.Context, client *http.Client, url, token, sessionID string, body []byte) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return nil, coreerr.E("mcpCall", "create request", err) + } + setHeaders(req, token, sessionID) + + resp, err := client.Do(req) + if err != nil { + return nil, coreerr.E("mcpCall", "request failed", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, coreerr.E("mcpCall", fmt.Sprintf("HTTP %d", resp.StatusCode), nil) + } + + // Parse SSE response — extract data: lines + return readSSEData(resp) +} + +// readSSEData reads an SSE response and extracts the JSON from data: lines. +func readSSEData(resp *http.Response) ([]byte, error) { + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "data: ") { + return []byte(strings.TrimPrefix(line, "data: ")), nil + } + } + return nil, coreerr.E("readSSEData", "no data in SSE response", nil) +} + +// setHeaders applies standard MCP HTTP headers. +func setHeaders(req *http.Request, token, sessionID string) { + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json, text/event-stream") + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + if sessionID != "" { + req.Header.Set("Mcp-Session-Id", sessionID) + } +} + +// drainSSE reads and discards an SSE response body. +func drainSSE(resp *http.Response) { + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + // Discard + } +} diff --git a/pkg/agentic/remote_status.go b/pkg/agentic/remote_status.go new file mode 100644 index 0000000..535ed26 --- /dev/null +++ b/pkg/agentic/remote_status.go @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "encoding/json" + "net/http" + "time" + + coreerr "forge.lthn.ai/core/go-log" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// --- agentic_status_remote tool --- + +// RemoteStatusInput queries a remote core-agent for workspace status. +type RemoteStatusInput struct { + Host string `json:"host"` // Remote agent host (e.g. "charon") +} + +// RemoteStatusOutput is the response from a remote status check. +type RemoteStatusOutput struct { + Success bool `json:"success"` + Host string `json:"host"` + Workspaces []WorkspaceInfo `json:"workspaces"` + Count int `json:"count"` + Error string `json:"error,omitempty"` +} + +func (s *PrepSubsystem) registerRemoteStatusTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_status_remote", + Description: "Check workspace status on a remote core-agent (e.g. Charon). Shows running, completed, blocked, and failed agents.", + }, s.statusRemote) +} + +func (s *PrepSubsystem) statusRemote(ctx context.Context, _ *mcp.CallToolRequest, input RemoteStatusInput) (*mcp.CallToolResult, RemoteStatusOutput, error) { + if input.Host == "" { + return nil, RemoteStatusOutput{}, coreerr.E("statusRemote", "host is required", nil) + } + + addr := resolveHost(input.Host) + token := remoteToken(input.Host) + url := "http://" + addr + "/mcp" + + client := &http.Client{Timeout: 15 * time.Second} + + sessionID, err := mcpInitialize(ctx, client, url, token) + if err != nil { + return nil, RemoteStatusOutput{ + Host: input.Host, + Error: "unreachable: " + err.Error(), + }, nil + } + + rpcReq := map[string]any{ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": map[string]any{ + "name": "agentic_status", + "arguments": map[string]any{}, + }, + } + body, _ := json.Marshal(rpcReq) + + result, err := mcpCall(ctx, client, url, token, sessionID, body) + if err != nil { + return nil, RemoteStatusOutput{ + Host: input.Host, + Error: "call failed: " + err.Error(), + }, nil + } + + output := RemoteStatusOutput{ + Success: true, + Host: input.Host, + } + + var rpcResp struct { + Result struct { + Content []struct { + Text string `json:"text"` + } `json:"content"` + } `json:"result"` + } + if json.Unmarshal(result, &rpcResp) == nil && len(rpcResp.Result.Content) > 0 { + var statusOut StatusOutput + if json.Unmarshal([]byte(rpcResp.Result.Content[0].Text), &statusOut) == nil { + output.Workspaces = statusOut.Workspaces + output.Count = statusOut.Count + } + } + + return nil, output, nil +} diff --git a/pkg/agentic/review_queue.go b/pkg/agentic/review_queue.go new file mode 100644 index 0000000..53fde04 --- /dev/null +++ b/pkg/agentic/review_queue.go @@ -0,0 +1,367 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "strings" + "time" + + coreio "forge.lthn.ai/core/go-io" + coreerr "forge.lthn.ai/core/go-log" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +// --- agentic_review_queue tool --- + +// ReviewQueueInput controls the review queue runner. +type ReviewQueueInput struct { + Limit int `json:"limit,omitempty"` // Max PRs to process this run (default: 4) + Reviewer string `json:"reviewer,omitempty"` // "coderabbit" (default), "codex", or "both" + DryRun bool `json:"dry_run,omitempty"` // Preview without acting + LocalOnly bool `json:"local_only,omitempty"` // Run review locally, don't touch GitHub +} + +// ReviewQueueOutput reports what happened. +type ReviewQueueOutput struct { + Success bool `json:"success"` + Processed []ReviewResult `json:"processed"` + Skipped []string `json:"skipped,omitempty"` + RateLimit *RateLimitInfo `json:"rate_limit,omitempty"` +} + +// ReviewResult is the outcome of reviewing one repo. +type ReviewResult struct { + Repo string `json:"repo"` + Verdict string `json:"verdict"` // clean, findings, rate_limited, error + Findings int `json:"findings"` // Number of findings (0 = clean) + Action string `json:"action"` // merged, fix_dispatched, skipped, waiting + Detail string `json:"detail,omitempty"` +} + +// RateLimitInfo tracks CodeRabbit rate limit state. +type RateLimitInfo struct { + Limited bool `json:"limited"` + RetryAt time.Time `json:"retry_at,omitempty"` + Message string `json:"message,omitempty"` +} + +func (s *PrepSubsystem) registerReviewQueueTool(server *mcp.Server) { + mcp.AddTool(server, &mcp.Tool{ + Name: "agentic_review_queue", + Description: "Process the CodeRabbit review queue. Runs local CodeRabbit review on repos, auto-merges clean ones on GitHub, dispatches fix agents for findings. Respects rate limits.", + }, s.reviewQueue) +} + +func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest, input ReviewQueueInput) (*mcp.CallToolResult, ReviewQueueOutput, error) { + limit := input.Limit + if limit <= 0 { + limit = 4 + } + + basePath := filepath.Join(s.codePath, "core") + + // Find repos with draft PRs (ahead of GitHub) + candidates := s.findReviewCandidates(basePath) + if len(candidates) == 0 { + return nil, ReviewQueueOutput{ + Success: true, + Processed: nil, + }, nil + } + + var processed []ReviewResult + var skipped []string + var rateInfo *RateLimitInfo + + for _, repo := range candidates { + if len(processed) >= limit { + skipped = append(skipped, repo+" (limit reached)") + continue + } + + // Check rate limit from previous run + if rateInfo != nil && rateInfo.Limited && time.Now().Before(rateInfo.RetryAt) { + skipped = append(skipped, repo+" (rate limited)") + continue + } + + repoDir := filepath.Join(basePath, repo) + result := s.reviewRepo(ctx, repoDir, repo, input.DryRun, input.LocalOnly) + + // Parse rate limit from result + if result.Verdict == "rate_limited" { + retryAfter := parseRetryAfter(result.Detail) + rateInfo = &RateLimitInfo{ + Limited: true, + RetryAt: time.Now().Add(retryAfter), + Message: result.Detail, + } + // Don't count rate-limited as processed — save the slot + skipped = append(skipped, repo+" (rate limited: "+retryAfter.String()+")") + continue + } + + processed = append(processed, result) + } + + // Save rate limit state for next run + if rateInfo != nil { + s.saveRateLimitState(rateInfo) + } + + return nil, ReviewQueueOutput{ + Success: true, + Processed: processed, + Skipped: skipped, + RateLimit: rateInfo, + }, nil +} + +// findReviewCandidates returns repos that are ahead of GitHub main. +func (s *PrepSubsystem) findReviewCandidates(basePath string) []string { + entries, err := os.ReadDir(basePath) + if err != nil { + return nil + } + + var candidates []string + for _, e := range entries { + if !e.IsDir() { + continue + } + repoDir := filepath.Join(basePath, e.Name()) + if !hasRemote(repoDir, "github") { + continue + } + ahead := commitsAhead(repoDir, "github/main", "HEAD") + if ahead > 0 { + candidates = append(candidates, e.Name()) + } + } + return candidates +} + +// reviewRepo runs CodeRabbit on a single repo and takes action. +func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo string, dryRun, localOnly bool) ReviewResult { + result := ReviewResult{Repo: repo} + + // Check saved rate limit + if rl := s.loadRateLimitState(); rl != nil && rl.Limited && time.Now().Before(rl.RetryAt) { + result.Verdict = "rate_limited" + result.Detail = fmt.Sprintf("retry after %s", rl.RetryAt.Format(time.RFC3339)) + return result + } + + // Run reviewer CLI locally + reviewer := "coderabbit" // default, can be overridden by caller + cmd := s.buildReviewCommand(ctx, repoDir, reviewer) + out, err := cmd.CombinedOutput() + output := string(out) + + // Parse rate limit (both reviewers use similar patterns) + if strings.Contains(output, "Rate limit exceeded") || strings.Contains(output, "rate limit") { + result.Verdict = "rate_limited" + result.Detail = output + return result + } + + // Parse error + if err != nil && !strings.Contains(output, "No findings") && !strings.Contains(output, "no issues") { + result.Verdict = "error" + result.Detail = output + return result + } + + // Store raw output for training data + s.storeReviewOutput(repoDir, repo, reviewer, output) + + // Parse verdict + if strings.Contains(output, "No findings") || strings.Contains(output, "no issues") || strings.Contains(output, "LGTM") { + result.Verdict = "clean" + result.Findings = 0 + + if dryRun { + result.Action = "skipped (dry run)" + return result + } + + if localOnly { + result.Action = "clean (local only)" + return result + } + + // Push to GitHub and mark PR ready / merge + if err := s.pushAndMerge(ctx, repoDir, repo); err != nil { + result.Action = "push failed: " + err.Error() + } else { + result.Action = "merged" + } + } else { + // Has findings — count them and dispatch fix agent + result.Verdict = "findings" + result.Findings = countFindings(output) + result.Detail = truncate(output, 500) + + if dryRun { + result.Action = "skipped (dry run)" + return result + } + + // Save findings for agent dispatch + findingsFile := filepath.Join(repoDir, ".core", "coderabbit-findings.txt") + coreio.Local.Write(findingsFile, output) + + // Dispatch fix agent with the findings + task := fmt.Sprintf("Fix CodeRabbit findings. The review output is in .core/coderabbit-findings.txt. "+ + "Read it, verify each finding against the code, fix what's valid. Run tests. "+ + "Commit: fix(coderabbit): address review findings\n\nFindings summary (%d issues):\n%s", + result.Findings, truncate(output, 1500)) + + s.dispatchFixFromQueue(ctx, repo, task) + result.Action = "fix_dispatched" + } + + return result +} + +// pushAndMerge pushes to GitHub dev and merges the PR. +func (s *PrepSubsystem) pushAndMerge(ctx context.Context, repoDir, repo string) error { + // Push to dev + pushCmd := exec.CommandContext(ctx, "git", "push", "github", "HEAD:refs/heads/dev", "--force") + pushCmd.Dir = repoDir + if out, err := pushCmd.CombinedOutput(); err != nil { + return coreerr.E("pushAndMerge", "push failed: "+string(out), err) + } + + // Mark PR ready if draft + readyCmd := exec.CommandContext(ctx, "gh", "pr", "ready", "1") + readyCmd.Dir = repoDir + readyCmd.Run() // Ignore error — might already be ready + + // Try to merge + mergeCmd := exec.CommandContext(ctx, "gh", "pr", "merge", "1", "--merge", "--delete-branch") + mergeCmd.Dir = repoDir + if out, err := mergeCmd.CombinedOutput(); err != nil { + return coreerr.E("pushAndMerge", "merge failed: "+string(out), err) + } + + return nil +} + +// dispatchFixFromQueue dispatches an opus agent to fix CodeRabbit findings. +func (s *PrepSubsystem) dispatchFixFromQueue(ctx context.Context, repo, task string) { + // Use the dispatch system — creates workspace, spawns agent + input := DispatchInput{ + Repo: repo, + Task: task, + Agent: "claude:opus", + } + s.dispatch(ctx, nil, input) +} + +// countFindings estimates the number of findings in CodeRabbit output. +func countFindings(output string) int { + // Count lines that look like findings + count := 0 + for _, line := range strings.Split(output, "\n") { + trimmed := strings.TrimSpace(line) + if strings.HasPrefix(trimmed, "- ") || strings.HasPrefix(trimmed, "* ") || + strings.Contains(trimmed, "Issue:") || strings.Contains(trimmed, "Finding:") || + strings.Contains(trimmed, "⚠") || strings.Contains(trimmed, "❌") { + count++ + } + } + if count == 0 && !strings.Contains(output, "No findings") { + count = 1 // At least one finding if not clean + } + return count +} + +// parseRetryAfter extracts the retry duration from a rate limit message. +// Example: "please try after 4 minutes and 56 seconds" +func parseRetryAfter(message string) time.Duration { + re := regexp.MustCompile(`(\d+)\s*minutes?\s*(?:and\s*)?(\d+)?\s*seconds?`) + matches := re.FindStringSubmatch(message) + if len(matches) >= 2 { + mins, _ := strconv.Atoi(matches[1]) + secs := 0 + if len(matches) >= 3 && matches[2] != "" { + secs, _ = strconv.Atoi(matches[2]) + } + return time.Duration(mins)*time.Minute + time.Duration(secs)*time.Second + } + // Default: 5 minutes + return 5 * time.Minute +} + +// buildReviewCommand creates the CLI command for the chosen reviewer. +func (s *PrepSubsystem) buildReviewCommand(ctx context.Context, repoDir, reviewer string) *exec.Cmd { + switch reviewer { + case "codex": + return exec.CommandContext(ctx, "codex", "review", "--base", "github/main") + default: // coderabbit + return exec.CommandContext(ctx, "coderabbit", "review", "--plain", + "--base", "github/main", "--config", "CLAUDE.md", "--cwd", repoDir) + } +} + +// storeReviewOutput saves raw review output for training data collection. +func (s *PrepSubsystem) storeReviewOutput(repoDir, repo, reviewer, output string) { + home, _ := os.UserHomeDir() + dataDir := filepath.Join(home, ".core", "training", "reviews") + coreio.Local.EnsureDir(dataDir) + + timestamp := time.Now().Format("2006-01-02T15-04-05") + filename := fmt.Sprintf("%s_%s_%s.txt", repo, reviewer, timestamp) + + // Write raw output + coreio.Local.Write(filepath.Join(dataDir, filename), output) + + // Append to JSONL for structured training + entry := map[string]string{ + "repo": repo, + "reviewer": reviewer, + "timestamp": time.Now().Format(time.RFC3339), + "output": output, + "verdict": "clean", + } + if !strings.Contains(output, "No findings") && !strings.Contains(output, "no issues") { + entry["verdict"] = "findings" + } + jsonLine, _ := json.Marshal(entry) + + jsonlPath := filepath.Join(dataDir, "reviews.jsonl") + existing, _ := coreio.Local.Read(jsonlPath) + coreio.Local.Write(jsonlPath, existing+string(jsonLine)+"\n") +} + +// saveRateLimitState persists rate limit info for cross-run awareness. +func (s *PrepSubsystem) saveRateLimitState(info *RateLimitInfo) { + home, _ := os.UserHomeDir() + path := filepath.Join(home, ".core", "coderabbit-ratelimit.json") + data, _ := json.Marshal(info) + coreio.Local.Write(path, string(data)) +} + +// loadRateLimitState reads persisted rate limit info. +func (s *PrepSubsystem) loadRateLimitState() *RateLimitInfo { + home, _ := os.UserHomeDir() + path := filepath.Join(home, ".core", "coderabbit-ratelimit.json") + data, err := coreio.Local.Read(path) + if err != nil { + return nil + } + var info RateLimitInfo + if json.Unmarshal([]byte(data), &info) != nil { + return nil + } + return &info +} diff --git a/pkg/agentic/status.go b/pkg/agentic/status.go index db30b33..9e396f3 100644 --- a/pkg/agentic/status.go +++ b/pkg/agentic/status.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strings" + "syscall" "time" coreio "forge.lthn.ai/core/go-io" @@ -25,6 +26,7 @@ import ( // running → completed (normal finish) // running → blocked (agent wrote BLOCKED.md and exited) // blocked → running (resume after ANSWER.md provided) +// completed → merged (PR verified and auto-merged) // running → failed (agent crashed / non-zero exit) // WorkspaceStatus represents the current state of an agent workspace. @@ -147,8 +149,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu // If status is "running", check if PID is still alive if st.Status == "running" && st.PID > 0 { - proc, err := os.FindProcess(st.PID) - if err != nil || proc.Signal(nil) != nil { + if err := syscall.Kill(st.PID, 0); err != nil { // Process died — check for BLOCKED.md blockedPath := filepath.Join(wsDir, "src", "BLOCKED.md") if data, err := coreio.Local.Read(blockedPath); err == nil { diff --git a/pkg/agentic/verify.go b/pkg/agentic/verify.go new file mode 100644 index 0000000..1eb820c --- /dev/null +++ b/pkg/agentic/verify.go @@ -0,0 +1,345 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package agentic + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + coreio "forge.lthn.ai/core/go-io" + coreerr "forge.lthn.ai/core/go-log" +) + +// autoVerifyAndMerge runs inline tests (fast gate) and merges if they pass. +// If tests fail or merge fails due to conflict, attempts one rebase+retry. +// If the retry also fails, labels the PR "needs-review" for human attention. +// +// For deeper review (security, conventions), dispatch a separate task: +// +// agentic_dispatch repo=go-crypt template=verify persona=engineering/engineering-security-engineer +func (s *PrepSubsystem) autoVerifyAndMerge(wsDir string) { + st, err := readStatus(wsDir) + if err != nil || st.PRURL == "" || st.Repo == "" { + return + } + + srcDir := filepath.Join(wsDir, "src") + org := st.Org + if org == "" { + org = "core" + } + + prNum := extractPRNumber(st.PRURL) + if prNum == 0 { + return + } + + // Attempt 1: run tests and try to merge + result := s.attemptVerifyAndMerge(srcDir, org, st.Repo, st.Branch, prNum) + if result == mergeSuccess { + if st2, err := readStatus(wsDir); err == nil { + st2.Status = "merged" + writeStatus(wsDir, st2) + } + return + } + + // Attempt 2: rebase onto main and retry + if result == mergeConflict || result == testFailed { + rebaseOK := s.rebaseBranch(srcDir, st.Branch) + if rebaseOK { + result2 := s.attemptVerifyAndMerge(srcDir, org, st.Repo, st.Branch, prNum) + if result2 == mergeSuccess { + if st2, err := readStatus(wsDir); err == nil { + st2.Status = "merged" + writeStatus(wsDir, st2) + } + return + } + } + } + + // Both attempts failed — flag for human review + s.flagForReview(org, st.Repo, prNum, result) + + if st2, err := readStatus(wsDir); err == nil { + st2.Question = "Flagged for review — auto-merge failed after retry" + writeStatus(wsDir, st2) + } +} + +type mergeResult int + +const ( + mergeSuccess mergeResult = iota + testFailed // tests didn't pass + mergeConflict // tests passed but merge failed (conflict) +) + +// attemptVerifyAndMerge runs tests and tries to merge. Returns the outcome. +func (s *PrepSubsystem) attemptVerifyAndMerge(srcDir, org, repo, branch string, prNum int) mergeResult { + testResult := s.runVerification(srcDir) + + if !testResult.passed { + comment := fmt.Sprintf("## Verification Failed\n\n**Command:** `%s`\n\n```\n%s\n```\n\n**Exit code:** %d", + testResult.testCmd, truncate(testResult.output, 2000), testResult.exitCode) + s.commentOnIssue(context.Background(), org, repo, prNum, comment) + return testFailed + } + + // Tests passed — try merge + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := s.forgeMergePR(ctx, org, repo, prNum); err != nil { + comment := fmt.Sprintf("## Tests Passed — Merge Failed\n\n`%s` passed but merge failed: %v", testResult.testCmd, err) + s.commentOnIssue(context.Background(), org, repo, prNum, comment) + return mergeConflict + } + + comment := fmt.Sprintf("## Auto-Verified & Merged\n\n**Tests:** `%s` — PASS\n\nAuto-merged by core-agent dispatch system.", testResult.testCmd) + s.commentOnIssue(context.Background(), org, repo, prNum, comment) + return mergeSuccess +} + +// rebaseBranch rebases the current branch onto origin/main and force-pushes. +func (s *PrepSubsystem) rebaseBranch(srcDir, branch string) bool { + // Fetch latest main + fetch := exec.Command("git", "fetch", "origin", "main") + fetch.Dir = srcDir + if err := fetch.Run(); err != nil { + return false + } + + // Rebase onto main + rebase := exec.Command("git", "rebase", "origin/main") + rebase.Dir = srcDir + if out, err := rebase.CombinedOutput(); err != nil { + // Rebase failed — abort and give up + abort := exec.Command("git", "rebase", "--abort") + abort.Dir = srcDir + abort.Run() + _ = out + return false + } + + // Force-push the rebased branch + push := exec.Command("git", "push", "--force-with-lease", "origin", branch) + push.Dir = srcDir + return push.Run() == nil +} + +// flagForReview adds the "needs-review" label to the PR via Forge API. +func (s *PrepSubsystem) flagForReview(org, repo string, prNum int, result mergeResult) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + // Ensure the label exists + s.ensureLabel(ctx, org, repo, "needs-review", "e11d48") + + // Add label to PR + payload, _ := json.Marshal(map[string]any{ + "labels": []int{s.getLabelID(ctx, org, repo, "needs-review")}, + }) + url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues/%d/labels", s.forgeURL, org, repo, prNum) + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "token "+s.forgeToken) + resp, err := s.client.Do(req) + if err == nil { + resp.Body.Close() + } + + // Comment explaining the situation + reason := "Tests failed after rebase" + if result == mergeConflict { + reason = "Merge conflict persists after rebase" + } + comment := fmt.Sprintf("## Needs Review\n\n%s. Auto-merge gave up after retry.\n\nLabelled `needs-review` for human attention.", reason) + s.commentOnIssue(ctx, org, repo, prNum, comment) +} + +// ensureLabel creates a label if it doesn't exist. +func (s *PrepSubsystem) ensureLabel(ctx context.Context, org, repo, name, colour string) { + payload, _ := json.Marshal(map[string]string{ + "name": name, + "color": "#" + colour, + }) + url := fmt.Sprintf("%s/api/v1/repos/%s/%s/labels", s.forgeURL, org, repo) + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "token "+s.forgeToken) + resp, err := s.client.Do(req) + if err == nil { + resp.Body.Close() + } +} + +// getLabelID fetches the ID of a label by name. +func (s *PrepSubsystem) getLabelID(ctx context.Context, org, repo, name string) int { + url := fmt.Sprintf("%s/api/v1/repos/%s/%s/labels?token=%s", s.forgeURL, org, repo, s.forgeToken) + req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) + resp, err := s.client.Do(req) + if err != nil { + return 0 + } + defer resp.Body.Close() + + var labels []struct { + ID int `json:"id"` + Name string `json:"name"` + } + json.NewDecoder(resp.Body).Decode(&labels) + for _, l := range labels { + if l.Name == name { + return l.ID + } + } + return 0 +} + +// verifyResult holds the outcome of running tests. +type verifyResult struct { + passed bool + output string + exitCode int + testCmd string +} + +// runVerification detects the project type and runs the appropriate test suite. +func (s *PrepSubsystem) runVerification(srcDir string) verifyResult { + if fileExists(filepath.Join(srcDir, "go.mod")) { + return s.runGoTests(srcDir) + } + if fileExists(filepath.Join(srcDir, "composer.json")) { + return s.runPHPTests(srcDir) + } + if fileExists(filepath.Join(srcDir, "package.json")) { + return s.runNodeTests(srcDir) + } + return verifyResult{passed: true, testCmd: "none", output: "No test runner detected"} +} + +func (s *PrepSubsystem) runGoTests(srcDir string) verifyResult { + cmd := exec.Command("go", "test", "./...", "-count=1", "-timeout", "120s") + cmd.Dir = srcDir + cmd.Env = append(os.Environ(), "GOWORK=off") + out, err := cmd.CombinedOutput() + + exitCode := 0 + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } else { + exitCode = 1 + } + } + + return verifyResult{passed: exitCode == 0, output: string(out), exitCode: exitCode, testCmd: "go test ./..."} +} + +func (s *PrepSubsystem) runPHPTests(srcDir string) verifyResult { + cmd := exec.Command("composer", "test", "--no-interaction") + cmd.Dir = srcDir + out, err := cmd.CombinedOutput() + + exitCode := 0 + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } else { + cmd2 := exec.Command("./vendor/bin/pest", "--no-interaction") + cmd2.Dir = srcDir + out2, err2 := cmd2.CombinedOutput() + if err2 != nil { + return verifyResult{passed: true, testCmd: "none", output: "No PHP test runner found"} + } + return verifyResult{passed: true, output: string(out2), exitCode: 0, testCmd: "vendor/bin/pest"} + } + } + + return verifyResult{passed: exitCode == 0, output: string(out), exitCode: exitCode, testCmd: "composer test"} +} + +func (s *PrepSubsystem) runNodeTests(srcDir string) verifyResult { + data, err := coreio.Local.Read(filepath.Join(srcDir, "package.json")) + if err != nil { + return verifyResult{passed: true, testCmd: "none", output: "Could not read package.json"} + } + + var pkg struct { + Scripts map[string]string `json:"scripts"` + } + if json.Unmarshal([]byte(data), &pkg) != nil || pkg.Scripts["test"] == "" { + return verifyResult{passed: true, testCmd: "none", output: "No test script in package.json"} + } + + cmd := exec.Command("npm", "test") + cmd.Dir = srcDir + out, err := cmd.CombinedOutput() + + exitCode := 0 + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + } else { + exitCode = 1 + } + } + + return verifyResult{passed: exitCode == 0, output: string(out), exitCode: exitCode, testCmd: "npm test"} +} + +// forgeMergePR merges a PR via the Forge API. +func (s *PrepSubsystem) forgeMergePR(ctx context.Context, org, repo string, prNum int) error { + payload, _ := json.Marshal(map[string]any{ + "Do": "merge", + "merge_message_field": "Auto-merged by core-agent after verification\n\nCo-Authored-By: Virgil ", + "delete_branch_after_merge": true, + }) + + url := fmt.Sprintf("%s/api/v1/repos/%s/%s/pulls/%d/merge", s.forgeURL, org, repo, prNum) + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "token "+s.forgeToken) + + resp, err := s.client.Do(req) + if err != nil { + return coreerr.E("forgeMergePR", "request failed", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 && resp.StatusCode != 204 { + var errBody map[string]any + json.NewDecoder(resp.Body).Decode(&errBody) + msg, _ := errBody["message"].(string) + return coreerr.E("forgeMergePR", fmt.Sprintf("HTTP %d: %s", resp.StatusCode, msg), nil) + } + + return nil +} + +// extractPRNumber gets the PR number from a Forge PR URL. +func extractPRNumber(prURL string) int { + parts := strings.Split(prURL, "/") + if len(parts) == 0 { + return 0 + } + var num int + fmt.Sscanf(parts[len(parts)-1], "%d", &num) + return num +} + +// fileExists checks if a file exists. +func fileExists(path string) bool { + data, err := coreio.Local.Read(path) + return err == nil && data != "" +} diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 91c6f52..c69337b 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -34,7 +34,11 @@ type Subsystem struct { // Track last seen state to only notify on changes lastCompletedCount int lastInboxCount int + lastSyncTimestamp int64 mu sync.Mutex + + // Event-driven poke channel — dispatch goroutine sends here on completion + poke chan struct{} } // Options configures the monitor. @@ -51,6 +55,7 @@ func New(opts ...Options) *Subsystem { } return &Subsystem{ interval: interval, + poke: make(chan struct{}, 1), } } @@ -90,6 +95,15 @@ func (m *Subsystem) Shutdown(_ context.Context) error { return nil } +// Poke triggers an immediate check cycle. Non-blocking — if a poke is already +// pending it's a no-op. Call this from dispatch when an agent completes. +func (m *Subsystem) Poke() { + select { + case m.poke <- struct{}{}: + default: + } +} + func (m *Subsystem) loop(ctx context.Context) { // Initial check after short delay (let server fully start) select { @@ -98,6 +112,9 @@ func (m *Subsystem) loop(ctx context.Context) { case <-time.After(5 * time.Second): } + // Initialise sync timestamp to now (don't pull everything on first run) + m.initSyncTimestamp() + // Run first check immediately m.check(ctx) @@ -110,6 +127,8 @@ func (m *Subsystem) loop(ctx context.Context) { return case <-ticker.C: m.check(ctx) + case <-m.poke: + m.check(ctx) } } } @@ -127,6 +146,11 @@ func (m *Subsystem) check(ctx context.Context) { messages = append(messages, msg) } + // Sync repos from other agents' changes + if msg := m.syncRepos(); msg != "" { + messages = append(messages, msg) + } + // Only notify if there's something new if len(messages) == 0 { return @@ -222,7 +246,9 @@ func (m *Subsystem) checkInbox() string { var resp struct { Data []struct { - Read bool `json:"read"` + Read bool `json:"read"` + From string `json:"from_agent"` + Subject string `json:"subject"` } `json:"data"` } if json.Unmarshal(out, &resp) != nil { @@ -230,9 +256,17 @@ func (m *Subsystem) checkInbox() string { } unread := 0 + senders := make(map[string]int) + latestSubject := "" for _, msg := range resp.Data { if !msg.Read { unread++ + if msg.From != "" { + senders[msg.From]++ + } + if latestSubject == "" { + latestSubject = msg.Subject + } } } @@ -245,6 +279,21 @@ func (m *Subsystem) checkInbox() string { return "" } + // Write marker file for the PostToolUse hook to pick up + var senderList []string + for s, count := range senders { + if count > 1 { + senderList = append(senderList, fmt.Sprintf("%s (%d)", s, count)) + } else { + senderList = append(senderList, s) + } + } + notify := fmt.Sprintf("📬 %d new message(s) from %s", unread-prevInbox, strings.Join(senderList, ", ")) + if latestSubject != "" { + notify += fmt.Sprintf(" — \"%s\"", latestSubject) + } + os.WriteFile("/tmp/claude-inbox-notify", []byte(notify), 0644) + return fmt.Sprintf("%d unread message(s) in inbox", unread) } diff --git a/pkg/monitor/sync.go b/pkg/monitor/sync.go new file mode 100644 index 0000000..181824d --- /dev/null +++ b/pkg/monitor/sync.go @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package monitor + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +// CheckinResponse is what the API returns for an agent checkin. +type CheckinResponse struct { + // Repos that have new commits since the agent's last checkin. + Changed []ChangedRepo `json:"changed,omitempty"` + // Server timestamp — use as "since" on next checkin. + Timestamp int64 `json:"timestamp"` +} + +// ChangedRepo is a repo that has new commits. +type ChangedRepo struct { + Repo string `json:"repo"` + Branch string `json:"branch"` + SHA string `json:"sha"` +} + +// syncRepos calls the checkin API and pulls any repos that changed. +// Returns a human-readable message if repos were updated, empty string otherwise. +func (m *Subsystem) syncRepos() string { + apiURL := os.Getenv("CORE_API_URL") + if apiURL == "" { + apiURL = "https://api.lthn.sh" + } + + agentName := agentName() + + url := fmt.Sprintf("%s/v1/agent/checkin?agent=%s&since=%d", apiURL, agentName, m.lastSyncTimestamp) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return "" + } + + // Use brain key for auth + brainKey := os.Getenv("CORE_BRAIN_KEY") + if brainKey == "" { + home, _ := os.UserHomeDir() + if data, err := os.ReadFile(filepath.Join(home, ".claude", "brain.key")); err == nil { + brainKey = strings.TrimSpace(string(data)) + } + } + if brainKey != "" { + req.Header.Set("Authorization", "Bearer "+brainKey) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "" + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return "" + } + + var checkin CheckinResponse + if json.NewDecoder(resp.Body).Decode(&checkin) != nil { + return "" + } + + // Update timestamp for next checkin + m.mu.Lock() + m.lastSyncTimestamp = checkin.Timestamp + m.mu.Unlock() + + if len(checkin.Changed) == 0 { + return "" + } + + // Pull changed repos + basePath := os.Getenv("CODE_PATH") + if basePath == "" { + home, _ := os.UserHomeDir() + basePath = filepath.Join(home, "Code", "core") + } + + var pulled []string + for _, repo := range checkin.Changed { + repoDir := filepath.Join(basePath, repo.Repo) + if _, err := os.Stat(repoDir); err != nil { + continue + } + + // Check if we're already on main and clean + branchCmd := exec.Command("git", "rev-parse", "--abbrev-ref", "HEAD") + branchCmd.Dir = repoDir + branch, err := branchCmd.Output() + if err != nil || strings.TrimSpace(string(branch)) != "main" { + continue // Don't pull if not on main + } + + statusCmd := exec.Command("git", "status", "--porcelain") + statusCmd.Dir = repoDir + status, _ := statusCmd.Output() + if len(strings.TrimSpace(string(status))) > 0 { + continue // Don't pull if dirty + } + + // Fast-forward pull + pullCmd := exec.Command("git", "pull", "--ff-only", "origin", "main") + pullCmd.Dir = repoDir + if pullCmd.Run() == nil { + pulled = append(pulled, repo.Repo) + } + } + + if len(pulled) == 0 { + return "" + } + + return fmt.Sprintf("Synced %d repo(s): %s", len(pulled), strings.Join(pulled, ", ")) +} + +// lastSyncTimestamp is stored on the subsystem — add it via the check cycle. +// Initialised to "now" on first run so we don't pull everything on startup. +func (m *Subsystem) initSyncTimestamp() { + m.mu.Lock() + if m.lastSyncTimestamp == 0 { + m.lastSyncTimestamp = time.Now().Unix() + } + m.mu.Unlock() +} diff --git a/src/php/Controllers/Api/CheckinController.php b/src/php/Controllers/Api/CheckinController.php new file mode 100644 index 0000000..35a8370 --- /dev/null +++ b/src/php/Controllers/Api/CheckinController.php @@ -0,0 +1,83 @@ +query('since', '0'); + $agent = $request->query('agent', 'unknown'); + + $sinceDate = $since > 0 + ? \Carbon\Carbon::createFromTimestamp($since) + : now()->subMinutes(5); + + // Query webhook deliveries for push events since the given time. + // Forgejo sends GitHub-compatible webhooks, so event_type is "github.push.*". + $deliveries = UptelligenceWebhookDelivery::query() + ->where('created_at', '>', $sinceDate) + ->where('event_type', 'like', '%push%') + ->where('status', '!=', 'failed') + ->orderBy('created_at', 'asc') + ->get(); + + $changed = []; + $seen = []; + + foreach ($deliveries as $delivery) { + $payload = $delivery->payload; + if (! is_array($payload)) { + continue; + } + + // Extract repo name and branch from Forgejo/GitHub push payload + $repoName = $payload['repository']['name'] ?? null; + $ref = $payload['ref'] ?? ''; + $sha = $payload['after'] ?? ''; + + // Only track pushes to main/master + if (! $repoName || ! str_ends_with($ref, '/main') && ! str_ends_with($ref, '/master')) { + continue; + } + + $branch = basename($ref); + + // Deduplicate — only latest push per repo + if (isset($seen[$repoName])) { + continue; + } + $seen[$repoName] = true; + + $changed[] = [ + 'repo' => $repoName, + 'branch' => $branch, + 'sha' => $sha, + ]; + } + + return response()->json([ + 'changed' => $changed, + 'timestamp' => now()->timestamp, + 'agent' => $agent, + ]); + } +} diff --git a/src/php/Controllers/Api/GitHubWebhookController.php b/src/php/Controllers/Api/GitHubWebhookController.php new file mode 100644 index 0000000..9ee823b --- /dev/null +++ b/src/php/Controllers/Api/GitHubWebhookController.php @@ -0,0 +1,211 @@ +verifySignature($request, $secret)) { + Log::warning('GitHub webhook signature verification failed', [ + 'ip' => $request->ip(), + ]); + + return response('Invalid signature', 401); + } + + $event = $request->header('X-GitHub-Event', 'unknown'); + $payload = $request->json()->all(); + + Log::info('GitHub webhook received', [ + 'event' => $event, + 'action' => $payload['action'] ?? 'none', + 'repo' => $payload['repository']['full_name'] ?? 'unknown', + ]); + + // Store raw event for KPI tracking + $this->storeEvent($event, $payload); + + return match ($event) { + 'pull_request_review' => $this->handlePullRequestReview($payload), + 'push' => $this->handlePush($payload), + 'check_run' => $this->handleCheckRun($payload), + default => response()->json(['status' => 'ignored', 'event' => $event]), + }; + } + + /** + * Handle pull_request_review events. + * + * - approved by coderabbitai → queue auto-merge + * - changes_requested by coderabbitai → store findings for agent dispatch + */ + protected function handlePullRequestReview(array $payload): JsonResponse + { + $action = $payload['action'] ?? ''; + $review = $payload['review'] ?? []; + $pr = $payload['pull_request'] ?? []; + $reviewer = $review['user']['login'] ?? ''; + $state = $review['state'] ?? ''; + $repo = $payload['repository']['name'] ?? ''; + $prNumber = $pr['number'] ?? 0; + + if ($reviewer !== 'coderabbitai') { + return response()->json(['status' => 'ignored', 'reason' => 'not coderabbit']); + } + + if ($state === 'approved') { + Log::info('CodeRabbit approved PR', [ + 'repo' => $repo, + 'pr' => $prNumber, + ]); + + // Store approval event + $this->storeCodeRabbitResult($repo, $prNumber, 'approved', null); + + return response()->json([ + 'status' => 'approved', + 'repo' => $repo, + 'pr' => $prNumber, + 'action' => 'merge_queued', + ]); + } + + if ($state === 'changes_requested') { + $body = $review['body'] ?? ''; + + Log::info('CodeRabbit requested changes', [ + 'repo' => $repo, + 'pr' => $prNumber, + 'body_length' => strlen($body), + ]); + + // Store findings for agent dispatch + $this->storeCodeRabbitResult($repo, $prNumber, 'changes_requested', $body); + + return response()->json([ + 'status' => 'changes_requested', + 'repo' => $repo, + 'pr' => $prNumber, + 'action' => 'findings_stored', + ]); + } + + return response()->json(['status' => 'ignored', 'state' => $state]); + } + + /** + * Handle push events (future: reverse sync to Forge). + */ + protected function handlePush(array $payload): JsonResponse + { + $repo = $payload['repository']['name'] ?? ''; + $ref = $payload['ref'] ?? ''; + $after = $payload['after'] ?? ''; + + Log::info('GitHub push', [ + 'repo' => $repo, + 'ref' => $ref, + 'sha' => substr($after, 0, 8), + ]); + + return response()->json(['status' => 'logged', 'repo' => $repo]); + } + + /** + * Handle check_run events (future: build status tracking). + */ + protected function handleCheckRun(array $payload): JsonResponse + { + return response()->json(['status' => 'logged']); + } + + /** + * Verify GitHub webhook signature (SHA-256). + */ + protected function verifySignature(Request $request, string $secret): bool + { + $signature = $request->header('X-Hub-Signature-256', ''); + if (empty($signature)) { + return false; + } + + $payload = $request->getContent(); + $expected = 'sha256=' . hash_hmac('sha256', $payload, $secret); + + return hash_equals($expected, $signature); + } + + /** + * Store raw webhook event for KPI tracking. + */ + protected function storeEvent(string $event, array $payload): void + { + $repo = $payload['repository']['name'] ?? 'unknown'; + $action = $payload['action'] ?? ''; + + // Store in uptelligence webhook deliveries if available + try { + \DB::table('github_webhook_events')->insert([ + 'event' => $event, + 'action' => $action, + 'repo' => $repo, + 'payload' => json_encode($payload), + 'created_at' => now(), + ]); + } catch (\Throwable) { + // Table may not exist yet — log only + Log::debug('GitHub webhook event stored in log only', [ + 'event' => $event, + 'repo' => $repo, + ]); + } + } + + /** + * Store CodeRabbit review result for KPI tracking. + */ + protected function storeCodeRabbitResult(string $repo, int $prNumber, string $result, ?string $body): void + { + try { + \DB::table('coderabbit_reviews')->insert([ + 'repo' => $repo, + 'pr_number' => $prNumber, + 'result' => $result, + 'findings' => $body, + 'created_at' => now(), + ]); + } catch (\Throwable) { + Log::debug('CodeRabbit result stored in log only', [ + 'repo' => $repo, + 'pr' => $prNumber, + 'result' => $result, + ]); + } + } +} diff --git a/src/php/Migrations/2026_03_17_000001_create_github_tracking_tables.php b/src/php/Migrations/2026_03_17_000001_create_github_tracking_tables.php new file mode 100644 index 0000000..28e43de --- /dev/null +++ b/src/php/Migrations/2026_03_17_000001_create_github_tracking_tables.php @@ -0,0 +1,44 @@ +id(); + $table->string('event', 50)->index(); + $table->string('action', 50)->default(''); + $table->string('repo', 100)->index(); + $table->json('payload'); + $table->timestamp('created_at')->useCurrent(); + }); + + // CodeRabbit review results — the KPI table + Schema::create('coderabbit_reviews', function (Blueprint $table) { + $table->id(); + $table->string('repo', 100)->index(); + $table->unsignedInteger('pr_number'); + $table->string('result', 30)->index(); // approved, changes_requested + $table->text('findings')->nullable(); // Review body with findings + $table->boolean('findings_dispatched')->default(false); + $table->boolean('findings_resolved')->default(false); + $table->timestamp('created_at')->useCurrent(); + $table->timestamp('resolved_at')->nullable(); + + $table->index(['repo', 'pr_number']); + }); + } + + public function down(): void + { + Schema::dropIfExists('coderabbit_reviews'); + Schema::dropIfExists('github_webhook_events'); + } +}; diff --git a/src/php/Routes/api.php b/src/php/Routes/api.php index e9979e8..7ef5770 100644 --- a/src/php/Routes/api.php +++ b/src/php/Routes/api.php @@ -23,6 +23,16 @@ use Illuminate\Support\Facades\Route; // Health check (no auth required) Route::get('v1/health', [AgentApiController::class, 'health']); +// GitHub App webhook (signature-verified, no Bearer auth) +Route::post('github/webhook', [\Core\Mod\Agentic\Controllers\Api\GitHubWebhookController::class, 'receive']) + ->middleware('throttle:120,1'); + +// Agent checkin — discover which repos changed since last sync +// Uses auth.api (brain key) for authentication +Route::middleware(['throttle:120,1', 'auth.api:brain:read'])->group(function () { + Route::get('v1/agent/checkin', [\Core\Mod\Agentic\Controllers\Api\CheckinController::class, 'checkin']); +}); + // Authenticated agent endpoints Route::middleware(AgentApiAuth::class.':plans.read')->group(function () { // Plans (read)