agent/pkg/agentic/watch.go
Snider 04e3d492e9 fix(monitor): emit agent.completed per task, verify PIDs for queue.drained
- Export ReadStatus (was readStatus) for cross-package use
- AgentCompleted now emits agent.completed with repo/agent/workspace/status
  for every finished task, not just failures
- queue.drained only fires when genuinely empty — verified by checking
  PIDs are alive via kill(0), not just trusting stale status files
- Fix Docker mount paths: /root/ → /home/dev/ for non-root container
- Update all callers and tests

Co-Authored-By: Virgil <virgil@lethean.io>
2026-03-24 13:02:41 +00:00

216 lines
5.7 KiB
Go

// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"time"
core "dappco.re/go/core"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// WatchInput is the input for agentic_watch.
//
// input := agentic.WatchInput{Workspaces: []string{"go-io-123"}, PollInterval: 5, Timeout: 600}
type WatchInput struct {
// Workspaces to watch. If empty, watches all running/queued workspaces.
Workspaces []string `json:"workspaces,omitempty"`
// PollInterval in seconds (default: 5)
PollInterval int `json:"poll_interval,omitempty"`
// Timeout in seconds (default: 600 = 10 minutes)
Timeout int `json:"timeout,omitempty"`
}
// WatchOutput is the result when all watched workspaces complete.
//
// out := agentic.WatchOutput{Success: true, Completed: []agentic.WatchResult{{Workspace: "go-io-123", Status: "completed"}}}
type WatchOutput struct {
Success bool `json:"success"`
Completed []WatchResult `json:"completed"`
Failed []WatchResult `json:"failed,omitempty"`
Duration string `json:"duration"`
}
// WatchResult describes one completed workspace.
//
// result := agentic.WatchResult{Workspace: "go-io-123", Agent: "codex", Repo: "go-io", Status: "completed"}
type WatchResult struct {
Workspace string `json:"workspace"`
Agent string `json:"agent"`
Repo string `json:"repo"`
Status string `json:"status"`
PRURL string `json:"pr_url,omitempty"`
}
func (s *PrepSubsystem) registerWatchTool(server *mcp.Server) {
mcp.AddTool(server, &mcp.Tool{
Name: "agentic_watch",
Description: "Watch running/queued agent workspaces until they all complete. Sends progress notifications as each agent finishes. Returns summary when all are done.",
}, s.watch)
}
func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, input WatchInput) (*mcp.CallToolResult, WatchOutput, error) {
pollInterval := time.Duration(input.PollInterval) * time.Second
if pollInterval <= 0 {
pollInterval = 5 * time.Second
}
timeout := time.Duration(input.Timeout) * time.Second
if timeout <= 0 {
timeout = 10 * time.Minute
}
start := time.Now()
deadline := start.Add(timeout)
// Find workspaces to watch
targets := input.Workspaces
if len(targets) == 0 {
targets = s.findActiveWorkspaces()
}
if len(targets) == 0 {
return nil, WatchOutput{
Success: true,
Duration: "0s",
}, nil
}
var completed []WatchResult
var failed []WatchResult
remaining := make(map[string]bool)
for _, ws := range targets {
remaining[ws] = true
}
progressCount := float64(0)
total := float64(len(targets))
// Get progress token from request
progressToken := req.Params.GetProgressToken()
// Poll until all complete or timeout
for len(remaining) > 0 {
if time.Now().After(deadline) {
for ws := range remaining {
failed = append(failed, WatchResult{
Workspace: ws,
Status: "timeout",
})
}
break
}
select {
case <-ctx.Done():
return nil, WatchOutput{}, core.E("watch", "cancelled", ctx.Err())
case <-time.After(pollInterval):
}
for ws := range remaining {
wsDir := s.resolveWorkspaceDir(ws)
st, err := ReadStatus(wsDir)
if err != nil {
continue
}
switch st.Status {
case "completed":
result := WatchResult{
Workspace: ws,
Agent: st.Agent,
Repo: st.Repo,
Status: "completed",
PRURL: st.PRURL,
}
completed = append(completed, result)
delete(remaining, ws)
progressCount++
if progressToken != nil && req.Session != nil {
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: progressCount,
Total: total,
Message: core.Sprintf("%s completed (%s)", st.Repo, st.Agent),
})
}
case "merged", "ready-for-review":
result := WatchResult{
Workspace: ws,
Agent: st.Agent,
Repo: st.Repo,
Status: st.Status,
PRURL: st.PRURL,
}
completed = append(completed, result)
delete(remaining, ws)
progressCount++
if progressToken != nil && req.Session != nil {
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: progressCount,
Total: total,
Message: core.Sprintf("%s %s (%s)", st.Repo, st.Status, st.Agent),
})
}
case "failed", "blocked":
result := WatchResult{
Workspace: ws,
Agent: st.Agent,
Repo: st.Repo,
Status: st.Status,
}
failed = append(failed, result)
delete(remaining, ws)
progressCount++
if progressToken != nil && req.Session != nil {
req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: progressCount,
Total: total,
Message: core.Sprintf("%s %s (%s)", st.Repo, st.Status, st.Agent),
})
}
}
}
}
return nil, WatchOutput{
Success: len(failed) == 0,
Completed: completed,
Failed: failed,
Duration: time.Since(start).Round(time.Second).String(),
}, nil
}
// findActiveWorkspaces returns workspace names that are running or queued.
func (s *PrepSubsystem) findActiveWorkspaces() []string {
wsRoot := WorkspaceRoot()
entries := core.PathGlob(core.JoinPath(wsRoot, "*/status.json"))
var active []string
for _, entry := range entries {
wsDir := core.PathDir(entry)
st, err := ReadStatus(wsDir)
if err != nil {
continue
}
if st.Status == "running" || st.Status == "queued" {
active = append(active, core.PathBase(wsDir))
}
}
return active
}
// resolveWorkspaceDir converts a workspace name to full path.
func (s *PrepSubsystem) resolveWorkspaceDir(name string) string {
if core.PathIsAbs(name) {
return name
}
return core.JoinPath(WorkspaceRoot(), name)
}