refactor(monitor): direct event push, no filesystem polling

CompletionNotifier interface now has AgentStarted() and
AgentCompleted() instead of Poke(). Dispatch pushes notifications
directly to monitor with agent/repo/status data. Monitor pushes
MCP channel events immediately — no scanning, no dedup maps,
no filesystem polling latency.

Events.jsonl kept as audit log only, not notification mechanism.
Timer-based scan kept for startup seeding and stale detection.

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-22 16:19:13 +00:00
parent c04d3442dd
commit 9b225af2f9
4 changed files with 68 additions and 13 deletions

View file

@ -145,8 +145,16 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
proc.CloseStdin()
pid := proc.Info().PID
// Emit start event for channel notifications
emitStartEvent(agent, core.PathBase(wsDir))
// Notify monitor directly — no filesystem polling
if s.onComplete != nil {
st, _ := readStatus(wsDir)
repo := ""
if st != nil {
repo = st.Repo
}
s.onComplete.AgentStarted(agent, repo, core.PathBase(wsDir))
}
emitStartEvent(agent, core.PathBase(wsDir)) // audit log
go func() {
ticker := time.NewTicker(5 * time.Second)
@ -190,10 +198,16 @@ func (s *PrepSubsystem) spawnAgent(agent, prompt, wsDir string) (int, string, er
writeStatus(wsDir, st)
}
emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus)
emitCompletionEvent(agent, core.PathBase(wsDir), finalStatus) // audit log
// Push notification directly — no filesystem polling
if s.onComplete != nil {
s.onComplete.Poke()
stNow, _ := readStatus(wsDir)
repoName := ""
if stNow != nil {
repoName = stNow.Repo
}
s.onComplete.AgentCompleted(agent, repoName, core.PathBase(wsDir), finalStatus)
}
if finalStatus == "completed" {

View file

@ -22,12 +22,13 @@ import (
"gopkg.in/yaml.v3"
)
// CompletionNotifier is called when an agent completes, to trigger
// immediate notifications to connected clients.
// CompletionNotifier receives agent lifecycle events directly from dispatch.
// No filesystem polling — events flow in-memory.
//
// prep.SetCompletionNotifier(monitor)
type CompletionNotifier interface {
Poke()
AgentStarted(agent, repo, workspace string)
AgentCompleted(agent, repo, workspace, status string)
}
// PrepSubsystem provides agentic MCP tools for workspace orchestration.

View file

@ -194,9 +194,14 @@ func TestSetCompletionNotifier_Good(t *testing.T) {
}
type mockNotifier struct {
poked bool
started bool
completed bool
}
func (m *mockNotifier) Poke() {
m.poked = true
func (m *mockNotifier) AgentStarted(agent, repo, workspace string) {
m.started = true
}
func (m *mockNotifier) AgentCompleted(agent, repo, workspace, status string) {
m.completed = true
}

View file

@ -214,9 +214,7 @@ func (m *Subsystem) Shutdown(_ context.Context) error {
return nil
}
// Poke triggers an immediate check cycle.
//
// mon.Poke()
// Poke triggers an immediate check cycle (legacy — prefer AgentStarted/AgentCompleted).
func (m *Subsystem) Poke() {
select {
case m.poke <- struct{}{}:
@ -224,6 +222,43 @@ func (m *Subsystem) Poke() {
}
}
// AgentStarted pushes an immediate notification when an agent spawns.
// Called directly by dispatch — no filesystem polling.
//
// mon.AgentStarted("codex:gpt-5.3-codex-spark", "go-io", "core/go-io/task-5")
func (m *Subsystem) AgentStarted(agent, repo, workspace string) {
if m.notifier != nil {
m.notifier.ChannelSend(context.Background(), "agent.started", map[string]any{
"agent": agent,
"repo": repo,
})
}
}
// AgentCompleted pushes an immediate notification when an agent finishes.
// Called directly by dispatch — no filesystem polling needed.
//
// mon.AgentCompleted("codex", "go-io", "core/go-io/task-5", "completed")
func (m *Subsystem) AgentCompleted(agent, repo, workspace, status string) {
if m.notifier != nil {
// Count current running/queued from status for context
running := 0
queued := 0
m.mu.Lock()
m.seenCompleted[workspace] = true
m.mu.Unlock()
m.notifier.ChannelSend(context.Background(), "agent.complete", map[string]any{
"completed": []string{core.Sprintf("%s (%s) [%s]", repo, agent, status)},
"count": 1,
"running": running,
"queued": queued,
})
}
// Also poke to update counts for any other monitors
m.Poke()
}
func (m *Subsystem) loop(ctx context.Context) {
// Initial check after short delay (let server fully start)
select {