From 7932f3a7bab6434f361f5cb7e52e3b8c65d7ea9d Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 21 Mar 2026 12:56:24 +0000 Subject: [PATCH] feat(agent): wire channel notifications into monitor - Monitor pushes agent.complete, inbox.message, harvest.complete events via ChannelSend instead of temp files - Remove /tmp/claude-inbox-notify file write (channels replace it) - Update mcp.New() to use Options{} struct - Wire mcpSvc as ChannelNotifier after creation Co-Authored-By: Virgil --- cmd/main.go | 11 +- pkg/monitor/harvest.go | 259 +++++++++++++++++++++++++++++++++++++++++ pkg/monitor/monitor.go | 39 ++++++- 3 files changed, 300 insertions(+), 9 deletions(-) create mode 100644 pkg/monitor/harvest.go diff --git a/cmd/main.go b/cmd/main.go index 84cc82c..53cfeb4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -41,15 +41,16 @@ func main() { prep := agentic.NewPrep() prep.SetCompletionNotifier(mon) - mcpSvc, err := mcp.New( - mcp.WithSubsystem(brain.NewDirect()), - mcp.WithSubsystem(prep), - mcp.WithSubsystem(mon), - ) + mcpSvc, err := mcp.New(mcp.Options{ + Subsystems: []mcp.Subsystem{brain.NewDirect(), prep, mon}, + }) if err != nil { return nil, nil, cli.Wrap(err, "create MCP service") } + // Wire channel notifications — monitor pushes events into MCP sessions + mon.SetNotifier(mcpSvc) + return mcpSvc, mon, nil } diff --git a/pkg/monitor/harvest.go b/pkg/monitor/harvest.go new file mode 100644 index 0000000..39db438 --- /dev/null +++ b/pkg/monitor/harvest.go @@ -0,0 +1,259 @@ +// SPDX-License-Identifier: EUPL-1.2 + +// Harvest completed agent workspaces — push changes back to source repos. +// +// After an agent completes, its commits live in the workspace clone. +// This code pushes the agent's branch to the source repo so the +// changes are available for review. It checks for binaries and +// large files before pushing. + +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + + "dappco.re/go/agent/pkg/agentic" + coreio "forge.lthn.ai/core/go-io" +) + +// harvestResult tracks what happened during harvest. +type harvestResult struct { + repo string + branch string + files int + rejected string // non-empty if rejected (binary, too large, etc.) +} + +// harvestCompleted scans for completed workspaces and pushes their +// branches back to the source repos. Returns a summary message. +func (m *Subsystem) harvestCompleted() string { + wsRoot := agentic.WorkspaceRoot() + entries, err := filepath.Glob(filepath.Join(wsRoot, "*/status.json")) + if err != nil { + return "" + } + + var harvested []harvestResult + + for _, entry := range entries { + wsDir := filepath.Dir(entry) + result := m.harvestWorkspace(wsDir) + if result != nil { + harvested = append(harvested, *result) + } + } + + if len(harvested) == 0 { + return "" + } + + var parts []string + for _, h := range harvested { + if h.rejected != "" { + parts = append(parts, fmt.Sprintf("%s: REJECTED (%s)", h.repo, h.rejected)) + if m.notifier != nil { + m.notifier.ChannelSend(context.Background(), "harvest.rejected", map[string]any{ + "repo": h.repo, + "branch": h.branch, + "reason": h.rejected, + }) + } + } else { + parts = append(parts, fmt.Sprintf("%s: pushed %s (%d files)", h.repo, h.branch, h.files)) + if m.notifier != nil { + m.notifier.ChannelSend(context.Background(), "harvest.complete", map[string]any{ + "repo": h.repo, + "branch": h.branch, + "files": h.files, + }) + } + } + } + return "Harvested: " + strings.Join(parts, ", ") +} + +// harvestWorkspace checks a single workspace and pushes if ready. +func (m *Subsystem) harvestWorkspace(wsDir string) *harvestResult { + data, err := coreio.Local.Read(filepath.Join(wsDir, "status.json")) + if err != nil { + return nil + } + + var st struct { + Status string `json:"status"` + Repo string `json:"repo"` + Branch string `json:"branch"` + } + if json.Unmarshal([]byte(data), &st) != nil { + return nil + } + + // Only harvest completed workspaces (not merged, running, etc.) + if st.Status != "completed" { + return nil + } + + srcDir := filepath.Join(wsDir, "src") + if _, err := os.Stat(srcDir); err != nil { + return nil + } + + // Check if there are commits to push + branch := st.Branch + if branch == "" { + branch = detectBranch(srcDir) + } + if branch == "" || branch == "main" { + return nil + } + + // Check for unpushed commits + unpushed := countUnpushed(srcDir, branch) + if unpushed == 0 { + return nil // already pushed or no commits + } + + // Safety checks before pushing + if reason := checkSafety(srcDir); reason != "" { + updateStatus(wsDir, "rejected", reason) + return &harvestResult{repo: st.Repo, branch: branch, rejected: reason} + } + + // Count changed files + files := countChangedFiles(srcDir) + + // Push the branch to origin (which is the local source repo) + if err := pushBranch(srcDir, branch); err != nil { + return &harvestResult{repo: st.Repo, branch: branch, rejected: "push failed: " + err.Error()} + } + + // Update status to ready-for-review + updateStatus(wsDir, "ready-for-review", "") + + return &harvestResult{repo: st.Repo, branch: branch, files: files} +} + +// detectBranch returns the current branch name. +func detectBranch(srcDir string) string { + cmd := exec.Command("git", "rev-parse", "--abbrev-ref", "HEAD") + cmd.Dir = srcDir + out, err := cmd.Output() + if err != nil { + return "" + } + return strings.TrimSpace(string(out)) +} + +// countUnpushed returns the number of commits ahead of origin. +func countUnpushed(srcDir, branch string) int { + cmd := exec.Command("git", "rev-list", "--count", "origin/main.."+branch) + cmd.Dir = srcDir + out, err := cmd.Output() + if err != nil { + // origin/main might not exist — try counting all commits on branch + cmd2 := exec.Command("git", "log", "--oneline", "main.."+branch) + cmd2.Dir = srcDir + out2, err2 := cmd2.Output() + if err2 != nil { + return 0 + } + lines := strings.Split(strings.TrimSpace(string(out2)), "\n") + if len(lines) == 1 && lines[0] == "" { + return 0 + } + return len(lines) + } + var count int + fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &count) + return count +} + +// checkSafety rejects workspaces with binaries or oversized files. +func checkSafety(srcDir string) string { + // Check for binary files in the diff + cmd := exec.Command("git", "diff", "--name-only", "--diff-filter=A", "main...HEAD") + cmd.Dir = srcDir + out, err := cmd.Output() + if err != nil { + return "" + } + + binaryExts := map[string]bool{ + ".exe": true, ".bin": true, ".so": true, ".dylib": true, + ".dll": true, ".o": true, ".a": true, ".pyc": true, + ".class": true, ".jar": true, ".war": true, + ".zip": true, ".tar": true, ".gz": true, ".bz2": true, + ".png": true, ".jpg": true, ".jpeg": true, ".gif": true, + ".mp3": true, ".mp4": true, ".avi": true, ".mov": true, + ".db": true, ".sqlite": true, ".sqlite3": true, + } + + for _, file := range strings.Split(strings.TrimSpace(string(out)), "\n") { + if file == "" { + continue + } + ext := strings.ToLower(filepath.Ext(file)) + if binaryExts[ext] { + return fmt.Sprintf("binary file added: %s", file) + } + + // Check file size (reject > 1MB) + fullPath := filepath.Join(srcDir, file) + info, err := os.Stat(fullPath) + if err == nil && info.Size() > 1024*1024 { + return fmt.Sprintf("large file: %s (%d bytes)", file, info.Size()) + } + } + + return "" +} + +// countChangedFiles returns the number of files changed vs main. +func countChangedFiles(srcDir string) int { + cmd := exec.Command("git", "diff", "--name-only", "main...HEAD") + cmd.Dir = srcDir + out, err := cmd.Output() + if err != nil { + return 0 + } + lines := strings.Split(strings.TrimSpace(string(out)), "\n") + if len(lines) == 1 && lines[0] == "" { + return 0 + } + return len(lines) +} + +// pushBranch pushes the agent's branch to origin. +func pushBranch(srcDir, branch string) error { + cmd := exec.Command("git", "push", "origin", branch) + cmd.Dir = srcDir + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("%s: %s", err, strings.TrimSpace(string(out))) + } + return nil +} + +// updateStatus updates the workspace status.json. +func updateStatus(wsDir, status, question string) { + data, err := coreio.Local.Read(filepath.Join(wsDir, "status.json")) + if err != nil { + return + } + var st map[string]any + if json.Unmarshal([]byte(data), &st) != nil { + return + } + st["status"] = status + if question != "" { + st["question"] = question + } + updated, _ := json.MarshalIndent(st, "", " ") + coreio.Local.Write(filepath.Join(wsDir, "status.json"), string(updated)) +} diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 7f0941e..c63bfbe 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -26,9 +26,16 @@ import ( "github.com/modelcontextprotocol/go-sdk/mcp" ) +// ChannelNotifier pushes events to connected MCP sessions. +// Matches the Notifier interface in core/mcp without importing it. +type ChannelNotifier interface { + ChannelSend(ctx context.Context, channel string, data any) +} + // Subsystem implements mcp.Subsystem for background monitoring. type Subsystem struct { server *mcp.Server + notifier ChannelNotifier interval time.Duration cancel context.CancelFunc wg sync.WaitGroup @@ -43,6 +50,11 @@ type Subsystem struct { poke chan struct{} } +// SetNotifier wires up channel event broadcasting. +func (m *Subsystem) SetNotifier(n ChannelNotifier) { + m.notifier = n +} + // Options configures the monitor. type Options struct { // Interval between checks (default: 2 minutes) @@ -143,6 +155,11 @@ func (m *Subsystem) check(ctx context.Context) { messages = append(messages, msg) } + // Harvest completed workspaces — push branches, check for binaries + if msg := m.harvestCompleted(); msg != "" { + messages = append(messages, msg) + } + // Check inbox if msg := m.checkInbox(); msg != "" { messages = append(messages, msg) @@ -217,6 +234,16 @@ func (m *Subsystem) checkCompletions() string { return "" } + // Push channel events for each completion + if m.notifier != nil && len(recentlyCompleted) > 0 { + m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{ + "count": newCompletions, + "completed": recentlyCompleted, + "running": running, + "queued": queued, + }) + } + msg := fmt.Sprintf("%d agent(s) completed", newCompletions) if running > 0 { msg += fmt.Sprintf(", %d still running", running) @@ -302,11 +329,15 @@ func (m *Subsystem) checkInbox() string { senderList = append(senderList, s) } } - notify := fmt.Sprintf("📬 %d new message(s) from %s", unread-prevInbox, strings.Join(senderList, ", ")) - if latestSubject != "" { - notify += fmt.Sprintf(" — \"%s\"", latestSubject) + // Push channel event for new messages + if m.notifier != nil { + m.notifier.ChannelSend(context.Background(), "inbox.message", map[string]any{ + "new": unread - prevInbox, + "total": unread, + "senders": senderList, + "subject": latestSubject, + }) } - coreio.Local.Write("/tmp/claude-inbox-notify", notify) return fmt.Sprintf("%d unread message(s) in inbox", unread) }