fix(dispatch): concurrency, queue runner, and path improvements

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-29 20:40:20 +01:00
parent bf27743c44
commit 6bb4fb8d57
11 changed files with 134 additions and 150 deletions

View file

@ -2,7 +2,7 @@
# Shared between CLI and IDE extension
model = "gpt-5.4"
model_reasoning_effort = "high"
model_reasoning_effort = "extra-high"
approval_policy = "on-request"
sandbox_mode = "workspace-write"
personality = "pragmatic"

37
go.mod
View file

@ -7,9 +7,6 @@ require (
dappco.re/go/core/api v0.2.0
dappco.re/go/core/process v0.3.0
dappco.re/go/core/ws v0.3.0
forge.lthn.ai/core/api v0.1.6
forge.lthn.ai/core/cli v0.3.7
forge.lthn.ai/core/mcp v0.4.8
github.com/gin-gonic/gin v1.12.0
github.com/gorilla/websocket v1.5.3
github.com/modelcontextprotocol/go-sdk v1.4.1
@ -20,26 +17,12 @@ require (
require dappco.re/go/core/forge v0.2.0 // indirect
require (
dappco.re/go/core/i18n v0.2.0
dappco.re/go/core/io v0.2.0 // indirect
dappco.re/go/core/log v0.1.0 // indirect
dappco.re/go/core/scm v0.4.0
dappco.re/go/core/store v0.2.0
forge.lthn.ai/core/go v0.3.3 // indirect
forge.lthn.ai/core/go-ai v0.1.12 // indirect
forge.lthn.ai/core/go-i18n v0.1.7 // indirect
forge.lthn.ai/core/go-inference v0.1.7 // indirect
forge.lthn.ai/core/go-io v0.1.7 // indirect
forge.lthn.ai/core/go-log v0.0.4 // indirect
forge.lthn.ai/core/go-process v0.2.9 // indirect
forge.lthn.ai/core/go-rag v0.1.11 // indirect
forge.lthn.ai/core/go-webview v0.1.7 // indirect
forge.lthn.ai/core/go-ws v0.2.5 // indirect
github.com/99designs/gqlgen v0.17.88 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/agnivade/levenshtein v1.2.1 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/bmatcuk/doublestar/v4 v4.10.0 // indirect
github.com/buger/jsonparser v1.1.2 // indirect
@ -49,19 +32,10 @@ require (
github.com/casbin/casbin/v2 v2.135.0 // indirect
github.com/casbin/govaluate v1.10.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/charmbracelet/bubbletea v1.3.10 // indirect
github.com/charmbracelet/colorprofile v0.4.3 // indirect
github.com/charmbracelet/lipgloss v1.1.1-0.20250404203927-76690c660834 // indirect
github.com/charmbracelet/x/ansi v0.11.6 // indirect
github.com/charmbracelet/x/cellbuf v0.0.15 // indirect
github.com/charmbracelet/x/term v0.2.2 // indirect
github.com/clipperhouse/displaywidth v0.11.0 // indirect
github.com/clipperhouse/uax29/v2 v2.7.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
github.com/coreos/go-oidc/v3 v3.17.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/gabriel-vasile/mimetype v1.4.13 // indirect
github.com/gin-contrib/authz v1.0.6 // indirect
github.com/gin-contrib/cors v1.7.6 // indirect
@ -101,20 +75,13 @@ require (
github.com/gorilla/securecookie v1.1.2 // indirect
github.com/gorilla/sessions v1.4.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lucasb-eyer/go-colorful v1.3.0 // indirect
github.com/mailru/easyjson v0.9.2 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-localereader v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.21 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/muesli/ansi v0.0.0-20230316100256-276c6243b2f6 // indirect
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/termenv v0.16.0 // indirect
github.com/ollama/ollama v0.18.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
@ -122,12 +89,9 @@ require (
github.com/quic-go/qpack v0.6.0 // indirect
github.com/quic-go/quic-go v0.59.0 // indirect
github.com/redis/go-redis/v9 v9.18.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/segmentio/asm v1.2.1 // indirect
github.com/segmentio/encoding v0.5.4 // indirect
github.com/sosodev/duration v1.4.0 // indirect
github.com/spf13/cobra v1.10.2 // indirect
github.com/spf13/pflag v1.0.10 // indirect
github.com/swaggo/files v1.0.1 // indirect
github.com/swaggo/gin-swagger v1.6.1 // indirect
github.com/swaggo/swag v1.16.6 // indirect
@ -135,7 +99,6 @@ require (
github.com/ugorji/go/codec v1.3.1 // indirect
github.com/vektah/gqlparser/v2 v2.5.32 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
go.mongodb.org/mongo-driver/v2 v2.5.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect

View file

@ -118,19 +118,19 @@ func parseCoreDeps(gomod string) []coreDep {
}
// Match forge.lthn.ai/core/* requires (legacy paths)
if core.HasPrefix(line, "forge.lthn.ai/core/") {
parts := core.Split(line, " ")
mod := parts[0]
if seen[mod] {
continue
}
seen[mod] = true
suffix := core.TrimPrefix(mod, "forge.lthn.ai/core/")
repo := suffix
dir := core.Concat("core-", core.Replace(repo, "/", "-"))
deps = append(deps, coreDep{module: mod, repo: repo, dir: dir})
}
//if core.HasPrefix(line, "forge.lthn.ai/core/") {
// parts := core.Split(line, " ")
// mod := parts[0]
// if seen[mod] {
// continue
// }
// seen[mod] = true
//
// suffix := core.TrimPrefix(mod, "forge.lthn.ai/core/")
// repo := suffix
// dir := core.Concat("core-", core.Replace(repo, "/", "-"))
// deps = append(deps, coreDep{module: mod, repo: repo, dir: dir})
//}
}
return deps

View file

@ -334,7 +334,7 @@ func (s *PrepSubsystem) onAgentComplete(agent, wsDir, outputFile string, exitCod
st.PID = 0
st.Question = question
writeStatus(wsDir, st)
s.TrackWorkspace(core.PathBase(wsDir), st)
s.TrackWorkspace(WorkspaceName(wsDir), st)
}
// Rate-limit tracking
@ -468,6 +468,21 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
input.Template = "coding"
}
// Concurrency check — ask the runner
r := s.Core().Action("runner.dispatch").Run(ctx, core.NewOptions(
core.Option{Key: "agent", Value: input.Agent},
core.Option{Key: "repo", Value: input.Repo},
))
if !r.OK {
reason, _ := r.Value.(string)
out := DispatchOutput{
Repo: input.Repo,
Success: true,
OutputFile: core.Concat("queued — ", reason),
}
return nil, out, nil
}
// Step 1: Prep workspace — clone + build prompt
prepInput := PrepInput{
Repo: input.Repo,
@ -522,7 +537,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
}
writeStatus(wsDir, st)
if runnerSvc, ok := core.ServiceFor[workspaceTracker](s.Core(), "runner"); ok {
runnerSvc.TrackWorkspace(core.PathBase(wsDir), st)
runnerSvc.TrackWorkspace(WorkspaceName(wsDir), st)
}
return nil, DispatchOutput{
Success: true,
@ -555,7 +570,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
// Track in runner's registry (runner owns workspace state)
if s.ServiceRuntime != nil {
if runnerSvc, ok := core.ServiceFor[workspaceTracker](s.Core(), "runner"); ok {
runnerSvc.TrackWorkspace(core.PathBase(wsDir), st)
runnerSvc.TrackWorkspace(WorkspaceName(wsDir), st)
}
}

View file

@ -45,7 +45,7 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res
st.PID = pid
writeStatus(wsDir, st)
if runnerSvc, ok := core.ServiceFor[workspaceTracker](c, "runner"); ok {
runnerSvc.TrackWorkspace(core.PathBase(wsDir), st)
runnerSvc.TrackWorkspace(WorkspaceName(wsDir), st)
}
}
_ = outputFile

View file

@ -568,10 +568,9 @@ func TestQueue_BaseAgent_Good_NoVariant(t *testing.T) {
assert.Equal(t, "gemini", baseAgent("gemini"))
}
func TestQueue_BaseAgent_Good_CodexSparkSpecialCase(t *testing.T) {
// codex-spark variants map to their own pool name
assert.Equal(t, "codex-spark", baseAgent("codex:gpt-5.3-codex-spark"))
assert.Equal(t, "codex-spark", baseAgent("codex-spark"))
func TestQueue_BaseAgent_Good_CodexSpark(t *testing.T) {
// spark is codex, not a separate pool
assert.Equal(t, "codex", baseAgent("codex:gpt-5.3-codex-spark"))
}
func TestQueue_BaseAgent_Bad_EmptyString(t *testing.T) {

View file

@ -29,6 +29,20 @@ func WorkspaceRoot() string {
return core.JoinPath(CoreRoot(), "workspace")
}
// WorkspaceName extracts the unique workspace name from a full path.
// Given /Users/snider/Code/.core/workspace/core/go-io/dev → core/go-io/dev
//
// name := agentic.WorkspaceName("/Users/snider/Code/.core/workspace/core/go-io/dev")
func WorkspaceName(wsDir string) string {
root := WorkspaceRoot()
name := core.TrimPrefix(wsDir, root)
name = core.TrimPrefix(name, "/")
if name == "" {
return core.PathBase(wsDir)
}
return name
}
// CoreRoot returns the root directory for core ecosystem files.
// Checks CORE_WORKSPACE env var first, falls back to ~/Code/.core.
//

View file

@ -240,10 +240,6 @@ func (s *PrepSubsystem) countRunningByModel(agent string) int {
// baseAgent strips the model variant (gemini:flash → gemini).
func baseAgent(agent string) string {
// codex:gpt-5.3-codex-spark → codex-spark (separate pool)
if core.Contains(agent, "codex-spark") {
return "codex-spark"
}
return core.SplitN(agent, ":", 2)[0]
}
@ -370,7 +366,7 @@ func (s *PrepSubsystem) drainOne() bool {
st.PID = pid
st.Runs++
writeStatus(wsDir, st)
s.TrackWorkspace(core.PathBase(wsDir), st)
s.TrackWorkspace(WorkspaceName(wsDir), st)
return true
}

View file

@ -4,7 +4,6 @@ package runner
import (
"strconv"
"syscall"
"time"
core "dappco.re/go/core"
@ -103,7 +102,7 @@ func (s *Service) loadAgentsConfig() *AgentsConfig {
// canDispatchAgent checks both pool-level and per-model concurrency limits.
//
// if !s.canDispatchAgent("codex") { /* queue it */ }
func (s *Service) canDispatchAgent(agent string) bool {
func (s *Service) canDispatchAgent(agent string) (bool, string) {
var concurrency map[string]ConcurrencyLimit
if s.ServiceRuntime != nil {
r := s.Core().Config().Get("agents.concurrency")
@ -119,102 +118,53 @@ func (s *Service) canDispatchAgent(agent string) bool {
base := baseAgent(agent)
limit, ok := concurrency[base]
if !ok || limit.Total <= 0 {
return true
return true, ""
}
if s.countRunningByAgent(base) >= limit.Total {
return false
running := s.countRunningByAgent(base)
if running >= limit.Total {
return false, core.Sprintf("total %d/%d", running, limit.Total)
}
if limit.Models != nil {
model := modelVariant(agent)
if model != "" {
modelRunning := s.countRunningByModel(agent)
if modelLimit, has := limit.Models[model]; has && modelLimit > 0 {
if s.countRunningByModel(agent) >= modelLimit {
return false
if modelRunning >= modelLimit {
return false, core.Sprintf("model %s %d/%d", model, modelRunning, modelLimit)
}
}
}
}
return true
return true, ""
}
// countRunningByAgent counts running workspaces using the in-memory Registry.
//
// n := s.countRunningByAgent("codex")
func (s *Service) countRunningByAgent(agent string) int {
if s.workspaces != nil && s.workspaces.Len() > 0 {
count := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status == "running" && baseAgent(st.Agent) == agent {
// PID < 0 = reservation (pending spawn), always count
// PID > 0 = verify process is alive
if st.PID < 0 || (st.PID > 0 && syscall.Kill(st.PID, 0) == nil) {
count++
}
}
})
return count
}
return s.countRunningByAgentDisk(agent)
}
func (s *Service) countRunningByAgentDisk(agent string) int {
wsRoot := WorkspaceRoot()
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
count := 0
for _, statusPath := range append(old, deep...) {
st, err := ReadStatus(core.PathDir(statusPath))
if err != nil || st.Status != "running" {
continue
}
if baseAgent(st.Agent) != agent {
continue
}
if st.PID > 0 && syscall.Kill(st.PID, 0) == nil {
count++
}
}
return count
}
// countRunningByModel counts running workspaces for a specific agent:model.
func (s *Service) countRunningByModel(agent string) int {
if s.workspaces != nil && s.workspaces.Len() > 0 {
count := 0
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
if st.Status == "running" && st.Agent == agent {
if st.PID < 0 || (st.PID > 0 && syscall.Kill(st.PID, 0) == nil) {
count++
}
}
})
return count
}
wsRoot := WorkspaceRoot()
old := core.PathGlob(core.JoinPath(wsRoot, "*", "status.json"))
deep := core.PathGlob(core.JoinPath(wsRoot, "*", "*", "*", "status.json"))
count := 0
for _, statusPath := range append(old, deep...) {
st, err := ReadStatus(core.PathDir(statusPath))
if err != nil || st.Status != "running" {
continue
}
if st.Agent != agent {
continue
}
if st.PID > 0 && syscall.Kill(st.PID, 0) == nil {
count++
}
}
return count
}
// drainQueue fills available concurrency slots from queued workspaces.
func (s *Service) drainQueue() {
if s.frozen {
@ -240,7 +190,7 @@ func (s *Service) drainOne() bool {
continue
}
if !s.canDispatchAgent(st.Agent) {
if can, _ := s.canDispatchAgent(st.Agent); !can {
continue
}
@ -254,7 +204,7 @@ func (s *Service) drainOne() bool {
time.Sleep(delay)
}
if !s.canDispatchAgent(st.Agent) {
if can, _ := s.canDispatchAgent(st.Agent); !can {
continue
}
@ -345,9 +295,6 @@ func (s *Service) delayForAgent(agent string) time.Duration {
// --- Helpers ---
func baseAgent(agent string) string {
if core.Contains(agent, "codex-spark") {
return "codex-spark"
}
return core.SplitN(agent, ":", 2)[0]
}

View file

@ -65,8 +65,8 @@ func TestQueue_BaseAgent_Good_WithModel(t *testing.T) {
assert.Equal(t, "claude", baseAgent("claude:haiku"))
}
func TestQueue_BaseAgent_Bad_CodexSpark(t *testing.T) {
assert.Equal(t, "codex-spark", baseAgent("codex:gpt-5.3-codex-spark"))
func TestQueue_BaseAgent_Good_CodexSpark(t *testing.T) {
assert.Equal(t, "codex", baseAgent("codex:gpt-5.3-codex-spark"))
}
func TestQueue_BaseAgent_Ugly_Empty(t *testing.T) {

View file

@ -61,6 +61,12 @@ func Register(c *core.Core) core.Result {
c.Config().Set("agents.concurrency", cfg.Concurrency)
c.Config().Set("agents.rates", cfg.Rates)
c.Config().Set("agents.dispatch", cfg.Dispatch)
c.Config().Set("agents.config_path", core.JoinPath(CoreRoot(), "agents.yaml"))
codexTotal := 0
if cl, ok := cfg.Concurrency["codex"]; ok {
codexTotal = cl.Total
}
c.Config().Set("agents.codex_limit_debug", codexTotal)
return core.Result{Value: svc, OK: true}
}
@ -123,13 +129,26 @@ func (s *Service) OnShutdown(_ context.Context) core.Result {
func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case messages.AgentStarted:
base := baseAgent(ev.Agent)
running := s.countRunningByAgent(base)
var limit int
r := c.Config().Get("agents.concurrency")
if r.OK {
if concurrency, ok := r.Value.(map[string]ConcurrencyLimit); ok {
if cl, has := concurrency[base]; has {
limit = cl.Total
}
}
}
c.ACTION(coremcp.ChannelPush{
Channel: "agent.status",
Data: map[string]any{
"agent": ev.Agent,
"repo": ev.Repo,
"workspace": ev.Workspace,
"status": "started",
Data: &AgentNotification{
Status: "started",
Repo: ev.Repo,
Agent: ev.Agent,
Workspace: ev.Workspace,
Running: running,
Limit: limit,
},
})
@ -141,13 +160,26 @@ func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
st.PID = 0
}
})
cBase := baseAgent(ev.Agent)
cRunning := s.countRunningByAgent(cBase)
var cLimit int
cr := c.Config().Get("agents.concurrency")
if cr.OK {
if concurrency, ok := cr.Value.(map[string]ConcurrencyLimit); ok {
if cl, has := concurrency[cBase]; has {
cLimit = cl.Total
}
}
}
c.ACTION(coremcp.ChannelPush{
Channel: "agent.status",
Data: map[string]any{
"agent": ev.Agent,
"repo": ev.Repo,
"workspace": ev.Workspace,
"status": ev.Status,
Data: &AgentNotification{
Status: ev.Status,
Repo: ev.Repo,
Agent: ev.Agent,
Workspace: ev.Workspace,
Running: cRunning,
Limit: cLimit,
},
})
s.Poke()
@ -193,6 +225,8 @@ func (s *Service) TrackWorkspace(name string, st any) {
var ws WorkspaceStatus
if r := core.JSONUnmarshalString(json, &ws); r.OK {
s.workspaces.Set(name, &ws)
// Remove pending reservation now that the real workspace is tracked
s.workspaces.Delete(core.Concat("pending/", ws.Repo))
}
}
@ -219,18 +253,18 @@ func (s *Service) actionDispatch(_ context.Context, opts core.Options) core.Resu
s.dispatchMu.Lock()
defer s.dispatchMu.Unlock()
if !s.canDispatchAgent(agent) {
return core.Result{Value: "queued — at concurrency limit", OK: false}
can, reason := s.canDispatchAgent(agent)
if !can {
return core.Result{Value: core.Concat("queued — ", reason), OK: false}
}
// Reserve the slot immediately — before returning to agentic.
// Without this, parallel dispatches all see count < limit.
name := core.Concat("pending/", repo)
s.workspaces.Set(name, &WorkspaceStatus{
Status: "running",
Agent: agent,
Repo: repo,
PID: -1, // placeholder — agentic will update with real PID via TrackWorkspace
PID: -1,
})
return core.Result{OK: true}
@ -335,6 +369,10 @@ func (s *Service) hydrateWorkspaces() {
if err != nil || st == nil {
continue
}
// Re-queue running agents on restart — process is dead, re-dispatch
if st.Status == "running" {
st.Status = "queued"
}
name := core.TrimPrefix(wsDir, wsRoot)
name = core.TrimPrefix(name, "/")
s.workspaces.Set(name, st)
@ -344,6 +382,18 @@ func (s *Service) hydrateWorkspaces() {
// --- Types ---
// AgentNotification is the channel push payload for agent status updates.
// Field order is guaranteed by json tags — status and repo appear first
// so truncated notifications are still readable.
type AgentNotification struct {
Status string `json:"status"`
Repo string `json:"repo"`
Agent string `json:"agent"`
Workspace string `json:"workspace"`
Running int `json:"running"`
Limit int `json:"limit"`
}
// WorkspaceQuery is the QUERY type for workspace lookups.
//
// r := c.QUERY(runner.WorkspaceQuery{Status: "running"})