From 53db749738c87100ffd0174e302feb38d4cd0384 Mon Sep 17 00:00:00 2001 From: Snider Date: Thu, 26 Mar 2026 11:23:04 +0000 Subject: [PATCH] fix(runner): reserve slot on approval to prevent TOCTOU race Runner now creates a reservation entry (PID=-1) in the workspace Registry immediately when approving a dispatch. This prevents parallel requests from all seeing count < limit before any spawn completes. Reservations are counted by countRunningByAgent/ByModel (PID < 0 = always count). Agentic overwrites with real PID via TrackWorkspace after spawn. Co-Authored-By: Virgil --- pkg/agentic/dispatch.go | 1 + pkg/runner/queue.go | 6 ++++-- pkg/runner/runner.go | 15 ++++++++++++--- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index d51537c..0e86408 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -506,6 +506,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, if s.ServiceRuntime != nil { r := s.Core().Action("runner.dispatch").Run(ctx, core.NewOptions( core.Option{Key: "agent", Value: input.Agent}, + core.Option{Key: "repo", Value: input.Repo}, )) if !r.OK { // Runner denied — queue it diff --git a/pkg/runner/queue.go b/pkg/runner/queue.go index d6287fe..363ba29 100644 --- a/pkg/runner/queue.go +++ b/pkg/runner/queue.go @@ -147,7 +147,9 @@ func (s *Service) countRunningByAgent(agent string) int { count := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { if st.Status == "running" && baseAgent(st.Agent) == agent { - if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + // PID < 0 = reservation (pending spawn), always count + // PID > 0 = verify process is alive + if st.PID < 0 || (st.PID > 0 && syscall.Kill(st.PID, 0) == nil) { count++ } } @@ -184,7 +186,7 @@ func (s *Service) countRunningByModel(agent string) int { count := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { if st.Status == "running" && st.Agent == agent { - if st.PID > 0 && syscall.Kill(st.PID, 0) == nil { + if st.PID < 0 || (st.PID > 0 && syscall.Kill(st.PID, 0) == nil) { count++ } } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index ecee01c..fe77f60 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -193,13 +193,12 @@ func (s *Service) actionDispatch(_ context.Context, opts core.Options) core.Resu if s.frozen { return core.Result{Value: "queue is frozen", OK: false} } - // Dispatch is called by agentic via IPC — the actual spawn logic - // is delegated back to agentic which owns workspace prep + prompt building. - // Runner just gates: frozen check + concurrency check. + agent := opts.String("agent") if agent == "" { agent = "codex" } + repo := opts.String("repo") s.dispatchMu.Lock() defer s.dispatchMu.Unlock() @@ -208,6 +207,16 @@ func (s *Service) actionDispatch(_ context.Context, opts core.Options) core.Resu return core.Result{Value: "queued — at concurrency limit", OK: false} } + // Reserve the slot immediately — before returning to agentic. + // Without this, parallel dispatches all see count < limit. + name := core.Concat("pending/", repo) + s.workspaces.Set(name, &WorkspaceStatus{ + Status: "running", + Agent: agent, + Repo: repo, + PID: -1, // placeholder — agentic will update with real PID via TrackWorkspace + }) + return core.Result{OK: true} }