feat(agent): wire channel notifications into monitor
- Monitor pushes agent.complete, inbox.message, harvest.complete
events via ChannelSend instead of temp files
- Remove /tmp/claude-inbox-notify file write (channels replace it)
- Update mcp.New() to use Options{} struct
- Wire mcpSvc as ChannelNotifier after creation
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
21f234aa7c
commit
2f10c7d368
3 changed files with 300 additions and 9 deletions
11
cmd/main.go
11
cmd/main.go
|
|
@ -41,15 +41,16 @@ func main() {
|
|||
prep := agentic.NewPrep()
|
||||
prep.SetCompletionNotifier(mon)
|
||||
|
||||
mcpSvc, err := mcp.New(
|
||||
mcp.WithSubsystem(brain.NewDirect()),
|
||||
mcp.WithSubsystem(prep),
|
||||
mcp.WithSubsystem(mon),
|
||||
)
|
||||
mcpSvc, err := mcp.New(mcp.Options{
|
||||
Subsystems: []mcp.Subsystem{brain.NewDirect(), prep, mon},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, cli.Wrap(err, "create MCP service")
|
||||
}
|
||||
|
||||
// Wire channel notifications — monitor pushes events into MCP sessions
|
||||
mon.SetNotifier(mcpSvc)
|
||||
|
||||
return mcpSvc, mon, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
259
pkg/monitor/harvest.go
Normal file
259
pkg/monitor/harvest.go
Normal file
|
|
@ -0,0 +1,259 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
// Harvest completed agent workspaces — push changes back to source repos.
|
||||
//
|
||||
// After an agent completes, its commits live in the workspace clone.
|
||||
// This code pushes the agent's branch to the source repo so the
|
||||
// changes are available for review. It checks for binaries and
|
||||
// large files before pushing.
|
||||
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"dappco.re/go/agent/pkg/agentic"
|
||||
coreio "forge.lthn.ai/core/go-io"
|
||||
)
|
||||
|
||||
// harvestResult tracks what happened during harvest.
|
||||
type harvestResult struct {
|
||||
repo string
|
||||
branch string
|
||||
files int
|
||||
rejected string // non-empty if rejected (binary, too large, etc.)
|
||||
}
|
||||
|
||||
// harvestCompleted scans for completed workspaces and pushes their
|
||||
// branches back to the source repos. Returns a summary message.
|
||||
func (m *Subsystem) harvestCompleted() string {
|
||||
wsRoot := agentic.WorkspaceRoot()
|
||||
entries, err := filepath.Glob(filepath.Join(wsRoot, "*/status.json"))
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
var harvested []harvestResult
|
||||
|
||||
for _, entry := range entries {
|
||||
wsDir := filepath.Dir(entry)
|
||||
result := m.harvestWorkspace(wsDir)
|
||||
if result != nil {
|
||||
harvested = append(harvested, *result)
|
||||
}
|
||||
}
|
||||
|
||||
if len(harvested) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
var parts []string
|
||||
for _, h := range harvested {
|
||||
if h.rejected != "" {
|
||||
parts = append(parts, fmt.Sprintf("%s: REJECTED (%s)", h.repo, h.rejected))
|
||||
if m.notifier != nil {
|
||||
m.notifier.ChannelSend(context.Background(), "harvest.rejected", map[string]any{
|
||||
"repo": h.repo,
|
||||
"branch": h.branch,
|
||||
"reason": h.rejected,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
parts = append(parts, fmt.Sprintf("%s: pushed %s (%d files)", h.repo, h.branch, h.files))
|
||||
if m.notifier != nil {
|
||||
m.notifier.ChannelSend(context.Background(), "harvest.complete", map[string]any{
|
||||
"repo": h.repo,
|
||||
"branch": h.branch,
|
||||
"files": h.files,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return "Harvested: " + strings.Join(parts, ", ")
|
||||
}
|
||||
|
||||
// harvestWorkspace checks a single workspace and pushes if ready.
|
||||
func (m *Subsystem) harvestWorkspace(wsDir string) *harvestResult {
|
||||
data, err := coreio.Local.Read(filepath.Join(wsDir, "status.json"))
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var st struct {
|
||||
Status string `json:"status"`
|
||||
Repo string `json:"repo"`
|
||||
Branch string `json:"branch"`
|
||||
}
|
||||
if json.Unmarshal([]byte(data), &st) != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only harvest completed workspaces (not merged, running, etc.)
|
||||
if st.Status != "completed" {
|
||||
return nil
|
||||
}
|
||||
|
||||
srcDir := filepath.Join(wsDir, "src")
|
||||
if _, err := os.Stat(srcDir); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if there are commits to push
|
||||
branch := st.Branch
|
||||
if branch == "" {
|
||||
branch = detectBranch(srcDir)
|
||||
}
|
||||
if branch == "" || branch == "main" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check for unpushed commits
|
||||
unpushed := countUnpushed(srcDir, branch)
|
||||
if unpushed == 0 {
|
||||
return nil // already pushed or no commits
|
||||
}
|
||||
|
||||
// Safety checks before pushing
|
||||
if reason := checkSafety(srcDir); reason != "" {
|
||||
updateStatus(wsDir, "rejected", reason)
|
||||
return &harvestResult{repo: st.Repo, branch: branch, rejected: reason}
|
||||
}
|
||||
|
||||
// Count changed files
|
||||
files := countChangedFiles(srcDir)
|
||||
|
||||
// Push the branch to origin (which is the local source repo)
|
||||
if err := pushBranch(srcDir, branch); err != nil {
|
||||
return &harvestResult{repo: st.Repo, branch: branch, rejected: "push failed: " + err.Error()}
|
||||
}
|
||||
|
||||
// Update status to ready-for-review
|
||||
updateStatus(wsDir, "ready-for-review", "")
|
||||
|
||||
return &harvestResult{repo: st.Repo, branch: branch, files: files}
|
||||
}
|
||||
|
||||
// detectBranch returns the current branch name.
|
||||
func detectBranch(srcDir string) string {
|
||||
cmd := exec.Command("git", "rev-parse", "--abbrev-ref", "HEAD")
|
||||
cmd.Dir = srcDir
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(string(out))
|
||||
}
|
||||
|
||||
// countUnpushed returns the number of commits ahead of origin.
|
||||
func countUnpushed(srcDir, branch string) int {
|
||||
cmd := exec.Command("git", "rev-list", "--count", "origin/main.."+branch)
|
||||
cmd.Dir = srcDir
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
// origin/main might not exist — try counting all commits on branch
|
||||
cmd2 := exec.Command("git", "log", "--oneline", "main.."+branch)
|
||||
cmd2.Dir = srcDir
|
||||
out2, err2 := cmd2.Output()
|
||||
if err2 != nil {
|
||||
return 0
|
||||
}
|
||||
lines := strings.Split(strings.TrimSpace(string(out2)), "\n")
|
||||
if len(lines) == 1 && lines[0] == "" {
|
||||
return 0
|
||||
}
|
||||
return len(lines)
|
||||
}
|
||||
var count int
|
||||
fmt.Sscanf(strings.TrimSpace(string(out)), "%d", &count)
|
||||
return count
|
||||
}
|
||||
|
||||
// checkSafety rejects workspaces with binaries or oversized files.
|
||||
func checkSafety(srcDir string) string {
|
||||
// Check for binary files in the diff
|
||||
cmd := exec.Command("git", "diff", "--name-only", "--diff-filter=A", "main...HEAD")
|
||||
cmd.Dir = srcDir
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
binaryExts := map[string]bool{
|
||||
".exe": true, ".bin": true, ".so": true, ".dylib": true,
|
||||
".dll": true, ".o": true, ".a": true, ".pyc": true,
|
||||
".class": true, ".jar": true, ".war": true,
|
||||
".zip": true, ".tar": true, ".gz": true, ".bz2": true,
|
||||
".png": true, ".jpg": true, ".jpeg": true, ".gif": true,
|
||||
".mp3": true, ".mp4": true, ".avi": true, ".mov": true,
|
||||
".db": true, ".sqlite": true, ".sqlite3": true,
|
||||
}
|
||||
|
||||
for _, file := range strings.Split(strings.TrimSpace(string(out)), "\n") {
|
||||
if file == "" {
|
||||
continue
|
||||
}
|
||||
ext := strings.ToLower(filepath.Ext(file))
|
||||
if binaryExts[ext] {
|
||||
return fmt.Sprintf("binary file added: %s", file)
|
||||
}
|
||||
|
||||
// Check file size (reject > 1MB)
|
||||
fullPath := filepath.Join(srcDir, file)
|
||||
info, err := os.Stat(fullPath)
|
||||
if err == nil && info.Size() > 1024*1024 {
|
||||
return fmt.Sprintf("large file: %s (%d bytes)", file, info.Size())
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// countChangedFiles returns the number of files changed vs main.
|
||||
func countChangedFiles(srcDir string) int {
|
||||
cmd := exec.Command("git", "diff", "--name-only", "main...HEAD")
|
||||
cmd.Dir = srcDir
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
lines := strings.Split(strings.TrimSpace(string(out)), "\n")
|
||||
if len(lines) == 1 && lines[0] == "" {
|
||||
return 0
|
||||
}
|
||||
return len(lines)
|
||||
}
|
||||
|
||||
// pushBranch pushes the agent's branch to origin.
|
||||
func pushBranch(srcDir, branch string) error {
|
||||
cmd := exec.Command("git", "push", "origin", branch)
|
||||
cmd.Dir = srcDir
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %s", err, strings.TrimSpace(string(out)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateStatus updates the workspace status.json.
|
||||
func updateStatus(wsDir, status, question string) {
|
||||
data, err := coreio.Local.Read(filepath.Join(wsDir, "status.json"))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var st map[string]any
|
||||
if json.Unmarshal([]byte(data), &st) != nil {
|
||||
return
|
||||
}
|
||||
st["status"] = status
|
||||
if question != "" {
|
||||
st["question"] = question
|
||||
}
|
||||
updated, _ := json.MarshalIndent(st, "", " ")
|
||||
coreio.Local.Write(filepath.Join(wsDir, "status.json"), string(updated))
|
||||
}
|
||||
|
|
@ -26,9 +26,16 @@ import (
|
|||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
)
|
||||
|
||||
// ChannelNotifier pushes events to connected MCP sessions.
|
||||
// Matches the Notifier interface in core/mcp without importing it.
|
||||
type ChannelNotifier interface {
|
||||
ChannelSend(ctx context.Context, channel string, data any)
|
||||
}
|
||||
|
||||
// Subsystem implements mcp.Subsystem for background monitoring.
|
||||
type Subsystem struct {
|
||||
server *mcp.Server
|
||||
notifier ChannelNotifier
|
||||
interval time.Duration
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
|
@ -43,6 +50,11 @@ type Subsystem struct {
|
|||
poke chan struct{}
|
||||
}
|
||||
|
||||
// SetNotifier wires up channel event broadcasting.
|
||||
func (m *Subsystem) SetNotifier(n ChannelNotifier) {
|
||||
m.notifier = n
|
||||
}
|
||||
|
||||
// Options configures the monitor.
|
||||
type Options struct {
|
||||
// Interval between checks (default: 2 minutes)
|
||||
|
|
@ -143,6 +155,11 @@ func (m *Subsystem) check(ctx context.Context) {
|
|||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
// Harvest completed workspaces — push branches, check for binaries
|
||||
if msg := m.harvestCompleted(); msg != "" {
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
// Check inbox
|
||||
if msg := m.checkInbox(); msg != "" {
|
||||
messages = append(messages, msg)
|
||||
|
|
@ -217,6 +234,16 @@ func (m *Subsystem) checkCompletions() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// Push channel events for each completion
|
||||
if m.notifier != nil && len(recentlyCompleted) > 0 {
|
||||
m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{
|
||||
"count": newCompletions,
|
||||
"completed": recentlyCompleted,
|
||||
"running": running,
|
||||
"queued": queued,
|
||||
})
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("%d agent(s) completed", newCompletions)
|
||||
if running > 0 {
|
||||
msg += fmt.Sprintf(", %d still running", running)
|
||||
|
|
@ -302,11 +329,15 @@ func (m *Subsystem) checkInbox() string {
|
|||
senderList = append(senderList, s)
|
||||
}
|
||||
}
|
||||
notify := fmt.Sprintf("📬 %d new message(s) from %s", unread-prevInbox, strings.Join(senderList, ", "))
|
||||
if latestSubject != "" {
|
||||
notify += fmt.Sprintf(" — \"%s\"", latestSubject)
|
||||
// Push channel event for new messages
|
||||
if m.notifier != nil {
|
||||
m.notifier.ChannelSend(context.Background(), "inbox.message", map[string]any{
|
||||
"new": unread - prevInbox,
|
||||
"total": unread,
|
||||
"senders": senderList,
|
||||
"subject": latestSubject,
|
||||
})
|
||||
}
|
||||
coreio.Local.Write("/tmp/claude-inbox-notify", notify)
|
||||
|
||||
return fmt.Sprintf("%d unread message(s) in inbox", unread)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue