From f9d36cab0be7d858f8413b4ee46a404df0bf9317 Mon Sep 17 00:00:00 2001 From: Virgil Date: Sun, 29 Mar 2026 23:45:48 +0000 Subject: [PATCH] fix(ax): align brain and runner result flows Co-Authored-By: Virgil --- pkg/agentic/handlers.go | 19 +++++++++------- pkg/brain/brain.go | 20 ++++++++-------- pkg/brain/direct.go | 39 +++++++++++++++++++------------- pkg/brain/direct_test.go | 34 +++++++++++++++++++--------- pkg/brain/messaging.go | 34 ++++++++++++++++------------ pkg/brain/provider.go | 7 ++++-- pkg/brain/register.go | 3 ++- pkg/runner/paths.go | 8 +++---- pkg/runner/queue.go | 49 ++++++++++++++++++++++++++++------------ 9 files changed, 132 insertions(+), 81 deletions(-) diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index fb4cbbb..4538c02 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -14,10 +14,8 @@ import ( // HandleIPCEvents implements Core's IPC handler interface. // Auto-registered by WithService — no manual wiring needed. // -// Handles: -// -// AgentCompleted → ingest findings (runner handles channel push + queue poke) -// SpawnQueued → runner asks agentic to spawn a queued workspace +// _ = prep.HandleIPCEvents(c, messages.AgentCompleted{Workspace: "core/go-io/task-5"}) +// _ = prep.HandleIPCEvents(c, messages.SpawnQueued{Workspace: "core/go-io/task-5", Agent: "codex", Task: "fix tests"}) func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { switch ev := msg.(type) { case messages.AgentCompleted: @@ -29,7 +27,6 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res } case messages.SpawnQueued: - // Runner asks agentic to spawn a queued workspace // Runner asks agentic to spawn a queued workspace wsDir := resolveWorkspace(ev.Workspace) if wsDir == "" { @@ -57,10 +54,16 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res // SpawnFromQueue spawns an agent in a pre-prepped workspace. // Called by runner.Service via ServiceFor interface matching. // -// pid, err := prep.SpawnFromQueue("codex", prompt, wsDir) -func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, wsDir string) (int, error) { +// r := prep.SpawnFromQueue("codex", prompt, wsDir) +// pid := r.Value.(int) +func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, wsDir string) core.Result { pid, _, err := s.spawnAgent(agent, prompt, wsDir) - return pid, err + if err != nil { + return core.Result{ + Value: core.E("agentic.SpawnFromQueue", "failed to spawn queued agent", err), + } + } + return core.Result{Value: pid, OK: true} } // resolveWorkspace converts a workspace name back to the full path. diff --git a/pkg/brain/brain.go b/pkg/brain/brain.go index 12fe113..c27c63d 100644 --- a/pkg/brain/brain.go +++ b/pkg/brain/brain.go @@ -1,9 +1,9 @@ // SPDX-License-Identifier: EUPL-1.2 -// Package brain gives MCP and HTTP services a shared OpenBrain surface. +// Package brain gives MCP and HTTP services the same OpenBrain capability map. // -// sub := brain.New(bridge) -// sub.RegisterTools(server) +// sub := brain.New(nil) +// core.Println(sub.Name()) package brain import ( @@ -31,18 +31,18 @@ func fieldString(values map[string]any, key string) string { // but it has not been initialised (headless mode). var errBridgeNotAvailable = core.E("brain", "bridge not available", nil) -// Subsystem routes brain_* MCP tools through the shared IDE bridge. +// Subsystem routes `brain_*` MCP tools through the shared IDE bridge. // -// sub := brain.New(bridge) -// sub.RegisterTools(server) +// sub := brain.New(nil) +// core.Println(sub.Name()) // "brain" type Subsystem struct { bridge *ide.Bridge } -// New builds the bridge-backed OpenBrain subsystem. +// New builds the bridge-backed OpenBrain subsystem used by MCP. // -// sub := brain.New(bridge) -// _ = sub.Shutdown(context.Background()) +// sub := brain.New(nil) +// core.Println(sub.Name()) func New(bridge *ide.Bridge) *Subsystem { return &Subsystem{bridge: bridge} } @@ -54,7 +54,7 @@ func (s *Subsystem) Name() string { return "brain" } // RegisterTools publishes the bridge-backed brain tools on an MCP server. // -// sub := brain.New(bridge) +// sub := brain.New(nil) // sub.RegisterTools(server) func (s *Subsystem) RegisterTools(server *mcp.Server) { s.registerBrainTools(server) diff --git a/pkg/brain/direct.go b/pkg/brain/direct.go index fcc25fd..b96fc35 100644 --- a/pkg/brain/direct.go +++ b/pkg/brain/direct.go @@ -15,7 +15,7 @@ import ( // DirectSubsystem talks to OpenBrain over HTTP without the IDE bridge. // // sub := brain.NewDirect() -// sub.RegisterTools(server) +// core.Println(sub.Name()) // "brain" type DirectSubsystem struct { apiURL string apiKey string @@ -26,7 +26,7 @@ var _ coremcp.Subsystem = (*DirectSubsystem)(nil) // NewDirect builds the HTTP-backed OpenBrain subsystem. // // sub := brain.NewDirect() -// sub.RegisterTools(server) +// core.Println(sub.Name()) func NewDirect() *DirectSubsystem { apiURL := core.Env("CORE_BRAIN_URL") if apiURL == "" { @@ -61,7 +61,7 @@ func NewDirect() *DirectSubsystem { // name := sub.Name() // "brain" func (s *DirectSubsystem) Name() string { return "brain" } -// RegisterTools publishes the direct brain_* and agent_* tools on an MCP server. +// RegisterTools publishes the direct `brain_*` and `agent_*` tools on an MCP server. // // sub := brain.NewDirect() // sub.RegisterTools(server) @@ -104,9 +104,11 @@ func brainHomeDir() string { return core.Env("DIR_HOME") } -func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body any) (map[string]any, error) { +func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body any) core.Result { if s.apiKey == "" { - return nil, core.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil) + return core.Result{ + Value: core.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil), + } } requestURL := core.Concat(s.apiURL, path) @@ -117,20 +119,20 @@ func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body r := agentic.HTTPDo(ctx, method, requestURL, bodyStr, s.apiKey, "Bearer") if !r.OK { core.Error("brain API call failed", "method", method, "path", path) - return nil, core.E("brain.apiCall", "API call failed", nil) + return core.Result{Value: core.E("brain.apiCall", "API call failed", nil)} } var result map[string]any if ur := core.JSONUnmarshalString(r.Value.(string), &result); !ur.OK { core.Error("brain API response parse failed", "method", method, "path", path) - return nil, core.E("brain.apiCall", "parse response", nil) + return core.Result{Value: core.E("brain.apiCall", "parse response", nil)} } - return result, nil + return core.Result{Value: result, OK: true} } func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) { - result, err := s.apiCall(ctx, "POST", "/v1/brain/remember", map[string]any{ + result := s.apiCall(ctx, "POST", "/v1/brain/remember", map[string]any{ "content": input.Content, "type": input.Type, "tags": input.Tags, @@ -140,11 +142,13 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, "expires_in": input.ExpiresIn, "agent_id": agentic.AgentName(), }) - if err != nil { + if !result.OK { + err, _ := result.Value.(error) return nil, RememberOutput{}, err } - id, _ := result["id"].(string) + payload, _ := result.Value.(map[string]any) + id, _ := payload["id"].(string) return nil, RememberOutput{ Success: true, MemoryID: id, @@ -174,13 +178,15 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in body["top_k"] = 10 } - result, err := s.apiCall(ctx, "POST", "/v1/brain/recall", body) - if err != nil { + result := s.apiCall(ctx, "POST", "/v1/brain/recall", body) + if !result.OK { + err, _ := result.Value.(error) return nil, RecallOutput{}, err } var memories []Memory - if mems, ok := result["memories"].([]any); ok { + payload, _ := result.Value.(map[string]any) + if mems, ok := payload["memories"].([]any); ok { for _, m := range mems { if mm, ok := m.(map[string]any); ok { mem := Memory{ @@ -217,8 +223,9 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in } func (s *DirectSubsystem) forget(ctx context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) { - _, err := s.apiCall(ctx, "DELETE", core.Concat("/v1/brain/forget/", input.ID), nil) - if err != nil { + result := s.apiCall(ctx, "DELETE", core.Concat("/v1/brain/forget/", input.ID), nil) + if !result.OK { + err, _ := result.Value.(error) return nil, ForgetOutput{}, err } diff --git a/pkg/brain/direct_test.go b/pkg/brain/direct_test.go index bdf25ba..6eaa025 100644 --- a/pkg/brain/direct_test.go +++ b/pkg/brain/direct_test.go @@ -82,7 +82,9 @@ func TestDirect_Subsystem_Good_Shutdown(t *testing.T) { func TestDirect_ApiCall_Bad_NoAPIKey(t *testing.T) { sub := &DirectSubsystem{apiURL: "http://localhost", apiKey: ""} - _, err := sub.apiCall(context.Background(), "GET", "/test", nil) + result := sub.apiCall(context.Background(), "GET", "/test", nil) + require.False(t, result.OK) + err, _ := result.Value.(error) require.Error(t, err) assert.Contains(t, err.Error(), "no API key") } @@ -99,9 +101,10 @@ func TestDirect_ApiCall_Good_GET(t *testing.T) { })) defer srv.Close() - result, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) - require.NoError(t, err) - assert.Equal(t, "ok", result["status"]) + result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) + require.True(t, result.OK) + payload, _ := result.Value.(map[string]any) + assert.Equal(t, "ok", payload["status"]) } func TestDirect_ApiCall_Good_POSTWithBody(t *testing.T) { @@ -118,16 +121,19 @@ func TestDirect_ApiCall_Good_POSTWithBody(t *testing.T) { })) defer srv.Close() - result, err := newTestDirect(srv).apiCall(context.Background(), "POST", "/v1/brain/remember", map[string]any{"content": "hello"}) - require.NoError(t, err) - assert.Equal(t, "mem-123", result["id"]) + result := newTestDirect(srv).apiCall(context.Background(), "POST", "/v1/brain/remember", map[string]any{"content": "hello"}) + require.True(t, result.OK) + payload, _ := result.Value.(map[string]any) + assert.Equal(t, "mem-123", payload["id"]) } func TestDirect_ApiCall_Bad_ServerError(t *testing.T) { srv := httptest.NewServer(errorHandler(http.StatusInternalServerError, `{"error":"internal"}`)) defer srv.Close() - _, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) + result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) + require.False(t, result.OK) + err, _ := result.Value.(error) require.Error(t, err) assert.Contains(t, err.Error(), "API call failed") } @@ -139,14 +145,18 @@ func TestDirect_ApiCall_Bad_InvalidJSON(t *testing.T) { })) defer srv.Close() - _, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) + result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) + require.False(t, result.OK) + err, _ := result.Value.(error) require.Error(t, err) assert.Contains(t, err.Error(), "parse response") } func TestDirect_ApiCall_Bad_ConnectionRefused(t *testing.T) { sub := &DirectSubsystem{apiURL: "http://127.0.0.1:1", apiKey: "test-key"} - _, err := sub.apiCall(context.Background(), "GET", "/v1/test", nil) + result := sub.apiCall(context.Background(), "GET", "/v1/test", nil) + require.False(t, result.OK) + err, _ := result.Value.(error) require.Error(t, err) assert.Contains(t, err.Error(), "API call failed") } @@ -155,7 +165,9 @@ func TestDirect_ApiCall_Bad_BadRequest(t *testing.T) { srv := httptest.NewServer(errorHandler(http.StatusBadRequest, `{"error":"bad input"}`)) defer srv.Close() - _, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) + result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil) + require.False(t, result.OK) + err, _ := result.Value.(error) require.Error(t, err) assert.Contains(t, err.Error(), "API call failed") } diff --git a/pkg/brain/messaging.go b/pkg/brain/messaging.go index 7204749..f3bf2e7 100644 --- a/pkg/brain/messaging.go +++ b/pkg/brain/messaging.go @@ -33,7 +33,7 @@ func (s *DirectSubsystem) RegisterMessagingTools(server *mcp.Server) { // Input/Output types -// SendInput sends a direct message to another agent. +// SendInput is the payload for `agent_send`. // // brain.SendInput{To: "charon", Subject: "status update", Content: "deploy complete"} type SendInput struct { @@ -42,7 +42,7 @@ type SendInput struct { Subject string `json:"subject,omitempty"` } -// SendOutput reports the created direct message. +// SendOutput reports the stored direct message. // // brain.SendOutput{Success: true, ID: 42, To: "charon"} type SendOutput struct { @@ -51,14 +51,14 @@ type SendOutput struct { To string `json:"to"` } -// InboxInput selects which agent inbox to read. +// InboxInput selects which inbox `agent_inbox` should read. // // brain.InboxInput{Agent: "cladius"} type InboxInput struct { Agent string `json:"agent,omitempty"` } -// MessageItem is one inbox or conversation message. +// MessageItem is one inbox or conversation entry. // // brain.MessageItem{ID: 7, From: "cladius", To: "charon", Content: "all green"} type MessageItem struct { @@ -71,7 +71,7 @@ type MessageItem struct { CreatedAt string `json:"created_at"` } -// InboxOutput returns the latest direct messages for an agent. +// InboxOutput returns the latest direct messages for one agent. // // brain.InboxOutput{Success: true, Messages: []brain.MessageItem{{ID: 1, From: "charon", To: "cladius"}}} type InboxOutput struct { @@ -79,7 +79,7 @@ type InboxOutput struct { Messages []MessageItem `json:"messages"` } -// ConversationInput selects the agent thread to load. +// ConversationInput selects the thread `agent_conversation` should load. // // brain.ConversationInput{Agent: "charon"} type ConversationInput struct { @@ -101,17 +101,19 @@ func (s *DirectSubsystem) sendMessage(ctx context.Context, _ *mcp.CallToolReques return nil, SendOutput{}, core.E("brain.sendMessage", "to and content are required", nil) } - result, err := s.apiCall(ctx, "POST", "/v1/messages/send", map[string]any{ + result := s.apiCall(ctx, "POST", "/v1/messages/send", map[string]any{ "to": input.To, "from": agentic.AgentName(), "content": input.Content, "subject": input.Subject, }) - if err != nil { + if !result.OK { + err, _ := result.Value.(error) return nil, SendOutput{}, err } - data, _ := result["data"].(map[string]any) + payload, _ := result.Value.(map[string]any) + data, _ := payload["data"].(map[string]any) id, _ := data["id"].(float64) return nil, SendOutput{ @@ -127,14 +129,15 @@ func (s *DirectSubsystem) inbox(ctx context.Context, _ *mcp.CallToolRequest, inp agent = agentic.AgentName() } // Agent names are validated identifiers — no URL escaping needed. - result, err := s.apiCall(ctx, "GET", core.Concat("/v1/messages/inbox?agent=", agent), nil) - if err != nil { + result := s.apiCall(ctx, "GET", core.Concat("/v1/messages/inbox?agent=", agent), nil) + if !result.OK { + err, _ := result.Value.(error) return nil, InboxOutput{}, err } return nil, InboxOutput{ Success: true, - Messages: parseMessages(result), + Messages: parseMessages(result.Value.(map[string]any)), }, nil } @@ -143,14 +146,15 @@ func (s *DirectSubsystem) conversation(ctx context.Context, _ *mcp.CallToolReque return nil, ConversationOutput{}, core.E("brain.conversation", "agent is required", nil) } - result, err := s.apiCall(ctx, "GET", core.Concat("/v1/messages/conversation/", input.Agent, "?me=", agentic.AgentName()), nil) - if err != nil { + result := s.apiCall(ctx, "GET", core.Concat("/v1/messages/conversation/", input.Agent, "?me=", agentic.AgentName()), nil) + if !result.OK { + err, _ := result.Value.(error) return nil, ConversationOutput{}, err } return nil, ConversationOutput{ Success: true, - Messages: parseMessages(result), + Messages: parseMessages(result.Value.(map[string]any)), }, nil } diff --git a/pkg/brain/provider.go b/pkg/brain/provider.go index 8b10e2b..45b113c 100644 --- a/pkg/brain/provider.go +++ b/pkg/brain/provider.go @@ -15,7 +15,7 @@ import ( // BrainProvider exposes the same OpenBrain bridge over HTTP routes and WS events. // // provider := brain.NewProvider(bridge, hub) -// provider.RegisterRoutes(router.Group("/api/brain")) +// core.Println(provider.BasePath()) // "/api/brain" type BrainProvider struct { bridge *ide.Bridge hub *ws.Hub @@ -39,7 +39,7 @@ const ( // NewProvider builds the HTTP provider around the IDE bridge and WS hub. // // p := brain.NewProvider(bridge, hub) -// p.RegisterRoutes(router.Group("/api/brain")) +// core.Println(p.BasePath()) func NewProvider(bridge *ide.Bridge, hub *ws.Hub) *BrainProvider { return &BrainProvider{ bridge: bridge, @@ -60,6 +60,7 @@ func (p *BrainProvider) BasePath() string { return "/api/brain" } // Channels lists the WS events emitted after brain actions complete. // // channels := p.Channels() +// core.Println(channels[0]) // "brain.remember.complete" func (p *BrainProvider) Channels() []string { return []string{ "brain.remember.complete", @@ -71,6 +72,7 @@ func (p *BrainProvider) Channels() []string { // Element describes the browser component that renders the brain panel. // // spec := p.Element() +// core.Println(spec.Tag) // "core-brain-panel" func (p *BrainProvider) Element() provider.ElementSpec { return provider.ElementSpec{ Tag: "core-brain-panel", @@ -92,6 +94,7 @@ func (p *BrainProvider) RegisterRoutes(rg *gin.RouterGroup) { // Describe returns the route contract used by API discovery and docs. // // routes := p.Describe() +// core.Println(routes[0].Path) // "/remember" func (p *BrainProvider) Describe() []api.RouteDescription { return []api.RouteDescription{ { diff --git a/pkg/brain/register.go b/pkg/brain/register.go index 91b064e..ec1c190 100644 --- a/pkg/brain/register.go +++ b/pkg/brain/register.go @@ -6,10 +6,11 @@ import ( core "dappco.re/go/core" ) -// Register exposes the direct OpenBrain subsystem through core.WithService. +// Register exposes the direct OpenBrain subsystem through `core.WithService`. // // c := core.New(core.WithService(brain.Register)) // sub, _ := core.ServiceFor[*brain.DirectSubsystem](c, "brain") +// core.Println(sub.Name()) // "brain" func Register(c *core.Core) core.Result { brn := NewDirect() return core.Result{Value: brn, OK: true} diff --git a/pkg/runner/paths.go b/pkg/runner/paths.go index 803cb9e..52c92de 100644 --- a/pkg/runner/paths.go +++ b/pkg/runner/paths.go @@ -26,9 +26,9 @@ func CoreRoot() string { return agentic.CoreRoot() } -// ReadStatus reads a workspace status.json. +// ReadStatus reads `status.json` from one workspace directory. // -// st, err := runner.ReadStatus("/path/to/workspace") +// st, err := runner.ReadStatus("/srv/core/workspace/core/go-io/task-5") func ReadStatus(wsDir string) (*WorkspaceStatus, error) { status, err := agentic.ReadStatus(wsDir) if err != nil { @@ -44,9 +44,9 @@ func ReadStatus(wsDir string) (*WorkspaceStatus, error) { return &st, nil } -// WriteStatus writes a workspace status.json. +// WriteStatus writes `status.json` for one workspace directory. // -// runner.WriteStatus(wsDir, &runner.WorkspaceStatus{Status: "running", Agent: "codex"}) +// runner.WriteStatus("/srv/core/workspace/core/go-io/task-5", &runner.WorkspaceStatus{Status: "running", Agent: "codex"}) func WriteStatus(wsDir string, st *WorkspaceStatus) { if st == nil { return diff --git a/pkg/runner/queue.go b/pkg/runner/queue.go index f6241e3..895f73d 100644 --- a/pkg/runner/queue.go +++ b/pkg/runner/queue.go @@ -12,18 +12,22 @@ import ( "gopkg.in/yaml.v3" ) -// DispatchConfig controls agent dispatch behaviour. +// DispatchConfig mirrors the `dispatch:` block in `agents.yaml`. // -// cfg := runner.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding"} +// cfg := runner.DispatchConfig{ +// DefaultAgent: "codex", DefaultTemplate: "coding", WorkspaceRoot: "/srv/core/workspace", +// } type DispatchConfig struct { DefaultAgent string `yaml:"default_agent"` DefaultTemplate string `yaml:"default_template"` WorkspaceRoot string `yaml:"workspace_root"` } -// RateConfig controls pacing between task dispatches. +// RateConfig mirrors one agent pool under `rates:` in `agents.yaml`. // -// rate := runner.RateConfig{ResetUTC: "06:00", SustainedDelay: 120} +// rate := runner.RateConfig{ +// ResetUTC: "06:00", DailyLimit: 200, SustainedDelay: 120, BurstWindow: 2, BurstDelay: 300, +// } type RateConfig struct { ResetUTC string `yaml:"reset_utc"` DailyLimit int `yaml:"daily_limit"` @@ -68,7 +72,12 @@ func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error { return nil } -// AgentsConfig is the root of agents.yaml. +// AgentsConfig mirrors the full `agents.yaml` file. +// +// cfg := runner.AgentsConfig{ +// Version: 1, +// Dispatch: runner.DispatchConfig{DefaultAgent: "codex", DefaultTemplate: "coding"}, +// } type AgentsConfig struct { Version int `yaml:"version"` Dispatch DispatchConfig `yaml:"dispatch"` @@ -76,7 +85,10 @@ type AgentsConfig struct { Rates map[string]RateConfig `yaml:"rates"` } -// loadAgentsConfig reads agents.yaml from known paths. +// loadAgentsConfig reads `agents.yaml` from the Core root. +// +// cfg := s.loadAgentsConfig() +// core.Println(cfg.Dispatch.DefaultAgent) func (s *Service) loadAgentsConfig() *AgentsConfig { paths := []string{ core.JoinPath(CoreRoot(), "agents.yaml"), @@ -165,7 +177,9 @@ func (s *Service) countRunningByAgent(agent string) int { return count } -// countRunningByModel counts running workspaces for a specific agent:model. +// countRunningByModel counts running workspaces for a specific `agent:model`. +// +// n := s.countRunningByModel("codex:gpt-5.4") func (s *Service) countRunningByModel(agent string) int { count := 0 s.workspaces.Each(func(_ string, st *WorkspaceStatus) { @@ -182,7 +196,9 @@ func (s *Service) countRunningByModel(agent string) int { return count } -// drainQueue fills available concurrency slots from queued workspaces. +// drainQueue fills any free concurrency slots from queued workspaces. +// +// s.drainQueue() func (s *Service) drainQueue() { if s.frozen { return @@ -221,8 +237,8 @@ func (s *Service) drainOne() bool { continue } - // Ask agentic to spawn — runner doesn't own the spawn logic, - // just the gate. Send IPC to trigger the actual spawn. + // Ask agentic to spawn — runner owns the gate, + // agentic owns the actual process launch. // Workspace name is relative path from workspace root (e.g. "core/go-ai/dev") wsName := agentic.WorkspaceName(wsDir) core.Info("drainOne: found queued workspace", "workspace", wsName, "agent", st.Agent) @@ -232,7 +248,7 @@ func (s *Service) drainOne() bool { continue } type spawner interface { - SpawnFromQueue(agent, prompt, wsDir string) (int, error) + SpawnFromQueue(agent, prompt, wsDir string) core.Result } prep, ok := core.ServiceFor[spawner](s.Core(), "agentic") if !ok { @@ -240,9 +256,14 @@ func (s *Service) drainOne() bool { continue } prompt := core.Concat("TASK: ", st.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.") - pid, err := prep.SpawnFromQueue(st.Agent, prompt, wsDir) - if err != nil { - core.Error("drainOne: spawn failed", "err", err) + spawnResult := prep.SpawnFromQueue(st.Agent, prompt, wsDir) + if !spawnResult.OK { + core.Error("drainOne: spawn failed", "workspace", wsName, "reason", core.Sprint(spawnResult.Value)) + continue + } + pid, ok := spawnResult.Value.(int) + if !ok { + core.Error("drainOne: spawn returned non-int pid", "workspace", wsName) continue }