From 6bda31345ac264a9dd3994314d6a13526e2219f6 Mon Sep 17 00:00:00 2001 From: Virgil Date: Wed, 1 Apr 2026 14:44:53 +0000 Subject: [PATCH] feat(runner): emit queue drained events Co-Authored-By: Virgil --- pkg/runner/queue.go | 7 +++++-- pkg/runner/runner.go | 15 +++++++++++---- pkg/runner/runner_test.go | 19 +++++++++++++++++++ 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/pkg/runner/queue.go b/pkg/runner/queue.go index 3137125..691f492 100644 --- a/pkg/runner/queue.go +++ b/pkg/runner/queue.go @@ -187,15 +187,18 @@ func (s *Service) countRunningByModel(agent string) int { } // s.drainQueue() -func (s *Service) drainQueue() { +func (s *Service) drainQueue() int { if s.frozen { - return + return 0 } s.drainMu.Lock() defer s.drainMu.Unlock() + completed := 0 for s.drainOne() { + completed++ } + return completed } func (s *Service) drainOne() bool { diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index c80a221..f64c63d 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -180,7 +180,7 @@ func (s *Service) HandleIPCEvents(coreApp *core.Core, msg core.Message) core.Res s.Poke() case messages.PokeQueue: - s.drainQueue() + s.drainQueueAndNotify(coreApp) _ = ev } return core.Result{OK: true} @@ -340,7 +340,7 @@ func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result { } func (s *Service) actionPoke(_ context.Context, _ core.Options) core.Result { - s.drainQueue() + s.drainQueueAndNotify(s.Core()) return core.Result{OK: true} } @@ -362,13 +362,20 @@ func (s *Service) runLoop() { for { select { case <-ticker.C: - s.drainQueue() + s.drainQueueAndNotify(s.Core()) case <-s.pokeCh: - s.drainQueue() + s.drainQueueAndNotify(s.Core()) } } } +func (s *Service) drainQueueAndNotify(coreApp *core.Core) { + completed := s.drainQueue() + if coreApp != nil { + coreApp.ACTION(messages.QueueDrained{Completed: completed}) + } +} + func (s *Service) hydrateWorkspaces() { if s.workspaces == nil { s.workspaces = core.NewRegistry[*WorkspaceStatus]() diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index 6e74286..d31d294 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -339,6 +339,25 @@ func TestRunner_HandleIPCEvents_Good_UpdatesMatchingWorkspaceOnly(t *testing.T) assert.Equal(t, 222, second.PID) } +func TestRunner_HandleIPCEvents_Good_EmitsQueueDrained(t *testing.T) { + c := core.New(core.WithOption("name", "test")) + svc := New() + svc.ServiceRuntime = core.NewServiceRuntime(c, Options{}) + + var captured []messages.QueueDrained + c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result { + if ev, ok := msg.(messages.QueueDrained); ok { + captured = append(captured, ev) + } + return core.Result{OK: true} + }) + + r := svc.HandleIPCEvents(c, messages.PokeQueue{}) + assert.True(t, r.OK) + require.Len(t, captured, 1) + assert.Equal(t, 0, captured[0].Completed) +} + func TestRunner_HydrateWorkspaces_Good_DeepWorkspaceName(t *testing.T) { root := t.TempDir() t.Setenv("CORE_WORKSPACE", root)