diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index f73b9d4..11fa315 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -145,8 +145,16 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er proc.CloseStdin() pid := proc.Info().PID - // Emit start event for channel notifications - emitStartEvent(agent, core.PathBase(wsDir)) + // Notify monitor directly — no filesystem polling + if s.onComplete != nil { + st, _ := readStatus(wsDir) + repo := "" + if st != nil { + repo = st.Repo + } + s.onComplete.AgentStarted(agent, repo, core.PathBase(wsDir)) + } + emitStartEvent(agent, core.PathBase(wsDir)) // audit log go func() { ticker := time.NewTicker(5 * time.Second) @@ -190,10 +198,16 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er writeStatus(wsDir, st) } - emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) + emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) // audit log + // Push notification directly — no filesystem polling if s.onComplete != nil { - s.onComplete.Poke() + stNow, _ := readStatus(wsDir) + repoName := "" + if stNow != nil { + repoName = stNow.Repo + } + s.onComplete.AgentCompleted(agent, repoName, core.PathBase(wsDir), finalStatus) } if finalStatus == "completed" { diff --git a/pkg/agentic/prep.go b/pkg/agentic/prep.go index 7c025f8..f2f199f 100644 --- a/pkg/agentic/prep.go +++ b/pkg/agentic/prep.go @@ -22,12 +22,13 @@ import ( "gopkg.in/yaml.v3" ) -// CompletionNotifier is called when an agent completes, to trigger -// immediate notifications to connected clients. +// CompletionNotifier receives agent lifecycle events directly from dispatch. +// No filesystem polling — events flow in-memory. // // prep.SetCompletionNotifier(monitor) type CompletionNotifier interface { - Poke() + AgentStarted(agent, repo, workspace string) + AgentCompleted(agent, repo, workspace, status string) } // PrepSubsystem provides agentic MCP tools for workspace orchestration. diff --git a/pkg/agentic/prep_test.go b/pkg/agentic/prep_test.go index 07b2052..5afa5cd 100644 --- a/pkg/agentic/prep_test.go +++ b/pkg/agentic/prep_test.go @@ -194,9 +194,14 @@ func TestSetCompletionNotifier_Good(t *testing.T) { } type mockNotifier struct { - poked bool + started bool + completed bool } -func (m *mockNotifier) Poke() { - m.poked = true +func (m *mockNotifier) AgentStarted(agent, repo, workspace string) { + m.started = true +} + +func (m *mockNotifier) AgentCompleted(agent, repo, workspace, status string) { + m.completed = true } diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 0289ece..e4188b9 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -214,9 +214,7 @@ func (m *Subsystem) Shutdown(_ context.Context) error { return nil } -// Poke triggers an immediate check cycle. -// -// mon.Poke() +// Poke triggers an immediate check cycle (legacy — prefer AgentStarted/AgentCompleted). func (m *Subsystem) Poke() { select { case m.poke <- struct{}{}: @@ -224,6 +222,43 @@ func (m *Subsystem) Poke() { } } +// AgentStarted pushes an immediate notification when an agent spawns. +// Called directly by dispatch — no filesystem polling. +// +// mon.AgentStarted("codex:gpt-5.3-codex-spark", "go-io", "core/go-io/task-5") +func (m *Subsystem) AgentStarted(agent, repo, workspace string) { + if m.notifier != nil { + m.notifier.ChannelSend(context.Background(), "agent.started", map[string]any{ + "agent": agent, + "repo": repo, + }) + } +} + +// AgentCompleted pushes an immediate notification when an agent finishes. +// Called directly by dispatch — no filesystem polling needed. +// +// mon.AgentCompleted("codex", "go-io", "core/go-io/task-5", "completed") +func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) { + if m.notifier != nil { + // Count current running/queued from status for context + running := 0 + queued := 0 + m.mu.Lock() + m.seenCompleted[workspace] = true + m.mu.Unlock() + + m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{ + "completed": []string{core.Sprintf("%s (%s) [%s]", repo, agent, status)}, + "count": 1, + "running": running, + "queued": queued, + }) + } + // Also poke to update counts for any other monitors + m.Poke() +} + func (m *Subsystem) loop(ctx context.Context) { // Initial check after short delay (let server fully start) select {