2026-03-17 04:31:19 +00:00
// SPDX-License-Identifier: EUPL-1.2
package agentic
import (
"context"
"time"
2026-03-22 03:41:07 +00:00
core "dappco.re/go/core"
2026-03-17 04:31:19 +00:00
"github.com/modelcontextprotocol/go-sdk/mcp"
)
2026-03-31 04:33:36 +00:00
// input := agentic.WatchInput{Workspaces: []string{"core/go-io/task-42"}, PollInterval: 5, Timeout: 600}
2026-03-17 04:31:19 +00:00
type WatchInput struct {
2026-03-31 06:03:37 +00:00
Workspaces [ ] string ` json:"workspaces,omitempty" `
PollInterval int ` json:"poll_interval,omitempty" `
Timeout int ` json:"timeout,omitempty" `
2026-03-17 04:31:19 +00:00
}
2026-03-31 04:33:36 +00:00
// out := agentic.WatchOutput{Success: true, Completed: []agentic.WatchResult{{Workspace: "core/go-io/task-42", Status: "completed"}}}
2026-03-17 04:31:19 +00:00
type WatchOutput struct {
Success bool ` json:"success" `
Completed [ ] WatchResult ` json:"completed" `
Failed [ ] WatchResult ` json:"failed,omitempty" `
Duration string ` json:"duration" `
}
2026-03-31 04:33:36 +00:00
// result := agentic.WatchResult{Workspace: "core/go-io/task-42", Agent: "codex", Repo: "go-io", Status: "completed"}
2026-03-17 04:31:19 +00:00
type WatchResult struct {
Workspace string ` json:"workspace" `
Agent string ` json:"agent" `
Repo string ` json:"repo" `
Status string ` json:"status" `
PRURL string ` json:"pr_url,omitempty" `
}
func ( s * PrepSubsystem ) registerWatchTool ( server * mcp . Server ) {
mcp . AddTool ( server , & mcp . Tool {
Name : "agentic_watch" ,
Description : "Watch running/queued agent workspaces until they all complete. Sends progress notifications as each agent finishes. Returns summary when all are done." ,
} , s . watch )
}
2026-03-30 21:17:33 +00:00
func ( s * PrepSubsystem ) watch ( ctx context . Context , request * mcp . CallToolRequest , input WatchInput ) ( * mcp . CallToolResult , WatchOutput , error ) {
2026-03-17 04:31:19 +00:00
pollInterval := time . Duration ( input . PollInterval ) * time . Second
if pollInterval <= 0 {
pollInterval = 5 * time . Second
}
timeout := time . Duration ( input . Timeout ) * time . Second
if timeout <= 0 {
timeout = 10 * time . Minute
}
start := time . Now ( )
deadline := start . Add ( timeout )
2026-04-02 00:11:17 +00:00
workspaceNames := s . watchWorkspaceNames ( input . Workspaces )
2026-03-17 04:31:19 +00:00
2026-03-30 21:17:33 +00:00
if len ( workspaceNames ) == 0 {
2026-03-17 04:31:19 +00:00
return nil , WatchOutput {
Success : true ,
Duration : "0s" ,
} , nil
}
var completed [ ] WatchResult
var failed [ ] WatchResult
2026-03-30 21:17:33 +00:00
pendingWorkspaces := make ( map [ string ] bool )
for _ , workspaceName := range workspaceNames {
pendingWorkspaces [ workspaceName ] = true
2026-03-17 04:31:19 +00:00
}
progressCount := float64 ( 0 )
2026-03-30 21:17:33 +00:00
total := float64 ( len ( workspaceNames ) )
2026-03-17 04:31:19 +00:00
2026-03-30 00:35:33 +00:00
progressToken := any ( nil )
2026-03-30 21:17:33 +00:00
if request != nil && request . Params != nil {
progressToken = request . Params . GetProgressToken ( )
2026-03-30 00:35:33 +00:00
}
2026-03-17 04:31:19 +00:00
2026-03-30 21:17:33 +00:00
for len ( pendingWorkspaces ) > 0 {
2026-03-17 04:31:19 +00:00
if time . Now ( ) . After ( deadline ) {
2026-03-30 21:17:33 +00:00
for workspaceName := range pendingWorkspaces {
2026-03-17 04:31:19 +00:00
failed = append ( failed , WatchResult {
2026-03-30 21:17:33 +00:00
Workspace : workspaceName ,
2026-03-17 04:31:19 +00:00
Status : "timeout" ,
} )
}
break
}
select {
case <- ctx . Done ( ) :
2026-03-22 03:41:07 +00:00
return nil , WatchOutput { } , core . E ( "watch" , "cancelled" , ctx . Err ( ) )
2026-03-17 04:31:19 +00:00
case <- time . After ( pollInterval ) :
}
2026-03-30 21:17:33 +00:00
for workspaceName := range pendingWorkspaces {
workspaceDir := s . resolveWorkspaceDir ( workspaceName )
statusResult := ReadStatusResult ( workspaceDir )
workspaceStatus , ok := workspaceStatusValue ( statusResult )
2026-03-30 19:40:02 +00:00
if ! ok {
2026-03-17 04:31:19 +00:00
continue
}
2026-03-30 21:17:33 +00:00
switch workspaceStatus . Status {
2026-03-17 04:31:19 +00:00
case "completed" :
2026-03-30 21:17:33 +00:00
watchResult := WatchResult {
Workspace : workspaceName ,
Agent : workspaceStatus . Agent ,
Repo : workspaceStatus . Repo ,
2026-03-17 04:31:19 +00:00
Status : "completed" ,
2026-03-30 21:17:33 +00:00
PRURL : workspaceStatus . PRURL ,
2026-03-17 04:31:19 +00:00
}
2026-03-30 21:17:33 +00:00
completed = append ( completed , watchResult )
delete ( pendingWorkspaces , workspaceName )
2026-03-17 04:31:19 +00:00
progressCount ++
2026-03-30 21:17:33 +00:00
if request != nil && progressToken != nil && request . Session != nil {
request . Session . NotifyProgress ( ctx , & mcp . ProgressNotificationParams {
2026-03-17 04:31:19 +00:00
ProgressToken : progressToken ,
Progress : progressCount ,
Total : total ,
2026-03-30 21:17:33 +00:00
Message : core . Sprintf ( "%s completed (%s)" , workspaceStatus . Repo , workspaceStatus . Agent ) ,
2026-03-17 04:31:19 +00:00
} )
}
2026-03-21 16:53:55 +00:00
case "merged" , "ready-for-review" :
2026-03-30 21:17:33 +00:00
watchResult := WatchResult {
Workspace : workspaceName ,
Agent : workspaceStatus . Agent ,
Repo : workspaceStatus . Repo ,
Status : workspaceStatus . Status ,
PRURL : workspaceStatus . PRURL ,
2026-03-21 16:53:55 +00:00
}
2026-03-30 21:17:33 +00:00
completed = append ( completed , watchResult )
delete ( pendingWorkspaces , workspaceName )
2026-03-21 16:53:55 +00:00
progressCount ++
2026-03-30 21:17:33 +00:00
if request != nil && progressToken != nil && request . Session != nil {
request . Session . NotifyProgress ( ctx , & mcp . ProgressNotificationParams {
2026-03-21 16:53:55 +00:00
ProgressToken : progressToken ,
Progress : progressCount ,
Total : total ,
2026-03-30 21:17:33 +00:00
Message : core . Sprintf ( "%s %s (%s)" , workspaceStatus . Repo , workspaceStatus . Status , workspaceStatus . Agent ) ,
2026-03-21 16:53:55 +00:00
} )
}
2026-03-17 04:31:19 +00:00
case "failed" , "blocked" :
2026-03-30 21:17:33 +00:00
watchResult := WatchResult {
Workspace : workspaceName ,
Agent : workspaceStatus . Agent ,
Repo : workspaceStatus . Repo ,
Status : workspaceStatus . Status ,
2026-03-17 04:31:19 +00:00
}
2026-03-30 21:17:33 +00:00
failed = append ( failed , watchResult )
delete ( pendingWorkspaces , workspaceName )
2026-03-17 04:31:19 +00:00
progressCount ++
2026-03-30 21:17:33 +00:00
if request != nil && progressToken != nil && request . Session != nil {
request . Session . NotifyProgress ( ctx , & mcp . ProgressNotificationParams {
2026-03-17 04:31:19 +00:00
ProgressToken : progressToken ,
Progress : progressCount ,
Total : total ,
2026-03-30 21:17:33 +00:00
Message : core . Sprintf ( "%s %s (%s)" , workspaceStatus . Repo , workspaceStatus . Status , workspaceStatus . Agent ) ,
2026-03-17 04:31:19 +00:00
} )
}
}
}
}
return nil , WatchOutput {
Success : len ( failed ) == 0 ,
Completed : completed ,
Failed : failed ,
Duration : time . Since ( start ) . Round ( time . Second ) . String ( ) ,
} , nil
}
2026-04-02 00:11:17 +00:00
func ( s * PrepSubsystem ) watchWorkspaceNames ( workspaces [ ] string ) [ ] string {
if len ( workspaces ) == 0 {
return s . findActiveWorkspaces ( )
}
statusPaths := WorkspaceStatusPaths ( )
if len ( statusPaths ) == 0 {
return nil
}
seen := make ( map [ string ] bool )
add := func ( names [ ] string , workspaceName string ) [ ] string {
if workspaceName == "" || seen [ workspaceName ] {
return names
}
seen [ workspaceName ] = true
return append ( names , workspaceName )
}
var workspaceNames [ ] string
for _ , rawWorkspace := range workspaces {
requested := core . Trim ( rawWorkspace )
if requested == "" {
continue
}
requested = core . TrimSuffix ( requested , "/" )
matched := false
for _ , statusPath := range statusPaths {
workspaceDir := core . PathDir ( statusPath )
workspaceName := WorkspaceName ( workspaceDir )
if workspaceName == requested || workspaceDir == requested {
workspaceNames = add ( workspaceNames , workspaceName )
matched = true
}
}
prefix := requested
if ! core . HasSuffix ( prefix , "/" ) {
prefix = core . Concat ( prefix , "/" )
}
for _ , statusPath := range statusPaths {
workspaceDir := core . PathDir ( statusPath )
workspaceName := WorkspaceName ( workspaceDir )
if core . HasPrefix ( workspaceName , prefix ) || core . HasPrefix ( workspaceDir , prefix ) {
workspaceNames = add ( workspaceNames , workspaceName )
matched = true
}
}
if ! matched {
workspaceNames = add ( workspaceNames , requested )
}
}
return workspaceNames
}
2026-03-30 22:46:21 +00:00
// active := s.findActiveWorkspaces()
// if len(active) == 0 { return nil }
2026-03-17 04:31:19 +00:00
func ( s * PrepSubsystem ) findActiveWorkspaces ( ) [ ] string {
var active [ ] string
2026-03-29 20:15:58 +00:00
for _ , entry := range WorkspaceStatusPaths ( ) {
2026-03-30 21:17:33 +00:00
workspaceDir := core . PathDir ( entry )
statusResult := ReadStatusResult ( workspaceDir )
workspaceStatus , ok := workspaceStatusValue ( statusResult )
2026-03-30 19:40:02 +00:00
if ! ok {
2026-03-17 04:31:19 +00:00
continue
}
2026-03-30 21:17:33 +00:00
if workspaceStatus . Status == "running" || workspaceStatus . Status == "queued" {
active = append ( active , WorkspaceName ( workspaceDir ) )
2026-03-17 04:31:19 +00:00
}
}
return active
}
2026-03-31 04:33:36 +00:00
// dir := s.resolveWorkspaceDir("core/go-io/task-42")
2026-03-30 21:17:33 +00:00
func ( s * PrepSubsystem ) resolveWorkspaceDir ( workspaceName string ) string {
if core . PathIsAbs ( workspaceName ) {
return workspaceName
2026-03-17 04:31:19 +00:00
}
2026-03-30 21:17:33 +00:00
return core . JoinPath ( WorkspaceRoot ( ) , workspaceName )
2026-03-17 04:31:19 +00:00
}