feat(runner): emit queue drained events
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
6bb355c472
commit
6bda31345a
3 changed files with 35 additions and 6 deletions
|
|
@ -187,15 +187,18 @@ func (s *Service) countRunningByModel(agent string) int {
|
|||
}
|
||||
|
||||
// s.drainQueue()
|
||||
func (s *Service) drainQueue() {
|
||||
func (s *Service) drainQueue() int {
|
||||
if s.frozen {
|
||||
return
|
||||
return 0
|
||||
}
|
||||
s.drainMu.Lock()
|
||||
defer s.drainMu.Unlock()
|
||||
|
||||
completed := 0
|
||||
for s.drainOne() {
|
||||
completed++
|
||||
}
|
||||
return completed
|
||||
}
|
||||
|
||||
func (s *Service) drainOne() bool {
|
||||
|
|
|
|||
|
|
@ -180,7 +180,7 @@ func (s *Service) HandleIPCEvents(coreApp *core.Core, msg core.Message) core.Res
|
|||
s.Poke()
|
||||
|
||||
case messages.PokeQueue:
|
||||
s.drainQueue()
|
||||
s.drainQueueAndNotify(coreApp)
|
||||
_ = ev
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
|
|
@ -340,7 +340,7 @@ func (s *Service) actionKill(_ context.Context, _ core.Options) core.Result {
|
|||
}
|
||||
|
||||
func (s *Service) actionPoke(_ context.Context, _ core.Options) core.Result {
|
||||
s.drainQueue()
|
||||
s.drainQueueAndNotify(s.Core())
|
||||
return core.Result{OK: true}
|
||||
}
|
||||
|
||||
|
|
@ -362,13 +362,20 @@ func (s *Service) runLoop() {
|
|||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.drainQueue()
|
||||
s.drainQueueAndNotify(s.Core())
|
||||
case <-s.pokeCh:
|
||||
s.drainQueue()
|
||||
s.drainQueueAndNotify(s.Core())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) drainQueueAndNotify(coreApp *core.Core) {
|
||||
completed := s.drainQueue()
|
||||
if coreApp != nil {
|
||||
coreApp.ACTION(messages.QueueDrained{Completed: completed})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) hydrateWorkspaces() {
|
||||
if s.workspaces == nil {
|
||||
s.workspaces = core.NewRegistry[*WorkspaceStatus]()
|
||||
|
|
|
|||
|
|
@ -339,6 +339,25 @@ func TestRunner_HandleIPCEvents_Good_UpdatesMatchingWorkspaceOnly(t *testing.T)
|
|||
assert.Equal(t, 222, second.PID)
|
||||
}
|
||||
|
||||
func TestRunner_HandleIPCEvents_Good_EmitsQueueDrained(t *testing.T) {
|
||||
c := core.New(core.WithOption("name", "test"))
|
||||
svc := New()
|
||||
svc.ServiceRuntime = core.NewServiceRuntime(c, Options{})
|
||||
|
||||
var captured []messages.QueueDrained
|
||||
c.RegisterAction(func(_ *core.Core, msg core.Message) core.Result {
|
||||
if ev, ok := msg.(messages.QueueDrained); ok {
|
||||
captured = append(captured, ev)
|
||||
}
|
||||
return core.Result{OK: true}
|
||||
})
|
||||
|
||||
r := svc.HandleIPCEvents(c, messages.PokeQueue{})
|
||||
assert.True(t, r.OK)
|
||||
require.Len(t, captured, 1)
|
||||
assert.Equal(t, 0, captured[0].Completed)
|
||||
}
|
||||
|
||||
func TestRunner_HydrateWorkspaces_Good_DeepWorkspaceName(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
t.Setenv("CORE_WORKSPACE", root)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue