fix(ax): align brain and runner result flows
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
54581dcbd7
commit
f9d36cab0b
9 changed files with 132 additions and 81 deletions
|
|
@ -14,10 +14,8 @@ import (
|
|||
// HandleIPCEvents implements Core's IPC handler interface.
|
||||
// Auto-registered by WithService — no manual wiring needed.
|
||||
//
|
||||
// Handles:
|
||||
//
|
||||
// AgentCompleted → ingest findings (runner handles channel push + queue poke)
|
||||
// SpawnQueued → runner asks agentic to spawn a queued workspace
|
||||
// _ = prep.HandleIPCEvents(c, messages.AgentCompleted{Workspace: "core/go-io/task-5"})
|
||||
// _ = prep.HandleIPCEvents(c, messages.SpawnQueued{Workspace: "core/go-io/task-5", Agent: "codex", Task: "fix tests"})
|
||||
func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
|
||||
switch ev := msg.(type) {
|
||||
case messages.AgentCompleted:
|
||||
|
|
@ -29,7 +27,6 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res
|
|||
}
|
||||
|
||||
case messages.SpawnQueued:
|
||||
// Runner asks agentic to spawn a queued workspace
|
||||
// Runner asks agentic to spawn a queued workspace
|
||||
wsDir := resolveWorkspace(ev.Workspace)
|
||||
if wsDir == "" {
|
||||
|
|
@ -57,10 +54,16 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res
|
|||
// SpawnFromQueue spawns an agent in a pre-prepped workspace.
|
||||
// Called by runner.Service via ServiceFor interface matching.
|
||||
//
|
||||
// pid, err := prep.SpawnFromQueue("codex", prompt, wsDir)
|
||||
func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, wsDir string) (int, error) {
|
||||
// r := prep.SpawnFromQueue("codex", prompt, wsDir)
|
||||
// pid := r.Value.(int)
|
||||
func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, wsDir string) core.Result {
|
||||
pid, _, err := s.spawnAgent(agent, prompt, wsDir)
|
||||
return pid, err
|
||||
if err != nil {
|
||||
return core.Result{
|
||||
Value: core.E("agentic.SpawnFromQueue", "failed to spawn queued agent", err),
|
||||
}
|
||||
}
|
||||
return core.Result{Value: pid, OK: true}
|
||||
}
|
||||
|
||||
// resolveWorkspace converts a workspace name back to the full path.
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
// Package brain gives MCP and HTTP services a shared OpenBrain surface.
|
||||
// Package brain gives MCP and HTTP services the same OpenBrain capability map.
|
||||
//
|
||||
// sub := brain.New(bridge)
|
||||
// sub.RegisterTools(server)
|
||||
// sub := brain.New(nil)
|
||||
// core.Println(sub.Name())
|
||||
package brain
|
||||
|
||||
import (
|
||||
|
|
@ -31,18 +31,18 @@ func fieldString(values map[string]any, key string) string {
|
|||
// but it has not been initialised (headless mode).
|
||||
var errBridgeNotAvailable = core.E("brain", "bridge not available", nil)
|
||||
|
||||
// Subsystem routes brain_* MCP tools through the shared IDE bridge.
|
||||
// Subsystem routes `brain_*` MCP tools through the shared IDE bridge.
|
||||
//
|
||||
// sub := brain.New(bridge)
|
||||
// sub.RegisterTools(server)
|
||||
// sub := brain.New(nil)
|
||||
// core.Println(sub.Name()) // "brain"
|
||||
type Subsystem struct {
|
||||
bridge *ide.Bridge
|
||||
}
|
||||
|
||||
// New builds the bridge-backed OpenBrain subsystem.
|
||||
// New builds the bridge-backed OpenBrain subsystem used by MCP.
|
||||
//
|
||||
// sub := brain.New(bridge)
|
||||
// _ = sub.Shutdown(context.Background())
|
||||
// sub := brain.New(nil)
|
||||
// core.Println(sub.Name())
|
||||
func New(bridge *ide.Bridge) *Subsystem {
|
||||
return &Subsystem{bridge: bridge}
|
||||
}
|
||||
|
|
@ -54,7 +54,7 @@ func (s *Subsystem) Name() string { return "brain" }
|
|||
|
||||
// RegisterTools publishes the bridge-backed brain tools on an MCP server.
|
||||
//
|
||||
// sub := brain.New(bridge)
|
||||
// sub := brain.New(nil)
|
||||
// sub.RegisterTools(server)
|
||||
func (s *Subsystem) RegisterTools(server *mcp.Server) {
|
||||
s.registerBrainTools(server)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import (
|
|||
// DirectSubsystem talks to OpenBrain over HTTP without the IDE bridge.
|
||||
//
|
||||
// sub := brain.NewDirect()
|
||||
// sub.RegisterTools(server)
|
||||
// core.Println(sub.Name()) // "brain"
|
||||
type DirectSubsystem struct {
|
||||
apiURL string
|
||||
apiKey string
|
||||
|
|
@ -26,7 +26,7 @@ var _ coremcp.Subsystem = (*DirectSubsystem)(nil)
|
|||
// NewDirect builds the HTTP-backed OpenBrain subsystem.
|
||||
//
|
||||
// sub := brain.NewDirect()
|
||||
// sub.RegisterTools(server)
|
||||
// core.Println(sub.Name())
|
||||
func NewDirect() *DirectSubsystem {
|
||||
apiURL := core.Env("CORE_BRAIN_URL")
|
||||
if apiURL == "" {
|
||||
|
|
@ -61,7 +61,7 @@ func NewDirect() *DirectSubsystem {
|
|||
// name := sub.Name() // "brain"
|
||||
func (s *DirectSubsystem) Name() string { return "brain" }
|
||||
|
||||
// RegisterTools publishes the direct brain_* and agent_* tools on an MCP server.
|
||||
// RegisterTools publishes the direct `brain_*` and `agent_*` tools on an MCP server.
|
||||
//
|
||||
// sub := brain.NewDirect()
|
||||
// sub.RegisterTools(server)
|
||||
|
|
@ -104,9 +104,11 @@ func brainHomeDir() string {
|
|||
return core.Env("DIR_HOME")
|
||||
}
|
||||
|
||||
func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body any) (map[string]any, error) {
|
||||
func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body any) core.Result {
|
||||
if s.apiKey == "" {
|
||||
return nil, core.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil)
|
||||
return core.Result{
|
||||
Value: core.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil),
|
||||
}
|
||||
}
|
||||
|
||||
requestURL := core.Concat(s.apiURL, path)
|
||||
|
|
@ -117,20 +119,20 @@ func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body
|
|||
r := agentic.HTTPDo(ctx, method, requestURL, bodyStr, s.apiKey, "Bearer")
|
||||
if !r.OK {
|
||||
core.Error("brain API call failed", "method", method, "path", path)
|
||||
return nil, core.E("brain.apiCall", "API call failed", nil)
|
||||
return core.Result{Value: core.E("brain.apiCall", "API call failed", nil)}
|
||||
}
|
||||
|
||||
var result map[string]any
|
||||
if ur := core.JSONUnmarshalString(r.Value.(string), &result); !ur.OK {
|
||||
core.Error("brain API response parse failed", "method", method, "path", path)
|
||||
return nil, core.E("brain.apiCall", "parse response", nil)
|
||||
return core.Result{Value: core.E("brain.apiCall", "parse response", nil)}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
return core.Result{Value: result, OK: true}
|
||||
}
|
||||
|
||||
func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) {
|
||||
result, err := s.apiCall(ctx, "POST", "/v1/brain/remember", map[string]any{
|
||||
result := s.apiCall(ctx, "POST", "/v1/brain/remember", map[string]any{
|
||||
"content": input.Content,
|
||||
"type": input.Type,
|
||||
"tags": input.Tags,
|
||||
|
|
@ -140,11 +142,13 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest,
|
|||
"expires_in": input.ExpiresIn,
|
||||
"agent_id": agentic.AgentName(),
|
||||
})
|
||||
if err != nil {
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return nil, RememberOutput{}, err
|
||||
}
|
||||
|
||||
id, _ := result["id"].(string)
|
||||
payload, _ := result.Value.(map[string]any)
|
||||
id, _ := payload["id"].(string)
|
||||
return nil, RememberOutput{
|
||||
Success: true,
|
||||
MemoryID: id,
|
||||
|
|
@ -174,13 +178,15 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in
|
|||
body["top_k"] = 10
|
||||
}
|
||||
|
||||
result, err := s.apiCall(ctx, "POST", "/v1/brain/recall", body)
|
||||
if err != nil {
|
||||
result := s.apiCall(ctx, "POST", "/v1/brain/recall", body)
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return nil, RecallOutput{}, err
|
||||
}
|
||||
|
||||
var memories []Memory
|
||||
if mems, ok := result["memories"].([]any); ok {
|
||||
payload, _ := result.Value.(map[string]any)
|
||||
if mems, ok := payload["memories"].([]any); ok {
|
||||
for _, m := range mems {
|
||||
if mm, ok := m.(map[string]any); ok {
|
||||
mem := Memory{
|
||||
|
|
@ -217,8 +223,9 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in
|
|||
}
|
||||
|
||||
func (s *DirectSubsystem) forget(ctx context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) {
|
||||
_, err := s.apiCall(ctx, "DELETE", core.Concat("/v1/brain/forget/", input.ID), nil)
|
||||
if err != nil {
|
||||
result := s.apiCall(ctx, "DELETE", core.Concat("/v1/brain/forget/", input.ID), nil)
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return nil, ForgetOutput{}, err
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -82,7 +82,9 @@ func TestDirect_Subsystem_Good_Shutdown(t *testing.T) {
|
|||
|
||||
func TestDirect_ApiCall_Bad_NoAPIKey(t *testing.T) {
|
||||
sub := &DirectSubsystem{apiURL: "http://localhost", apiKey: ""}
|
||||
_, err := sub.apiCall(context.Background(), "GET", "/test", nil)
|
||||
result := sub.apiCall(context.Background(), "GET", "/test", nil)
|
||||
require.False(t, result.OK)
|
||||
err, _ := result.Value.(error)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "no API key")
|
||||
}
|
||||
|
|
@ -99,9 +101,10 @@ func TestDirect_ApiCall_Good_GET(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
result, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "ok", result["status"])
|
||||
result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
require.True(t, result.OK)
|
||||
payload, _ := result.Value.(map[string]any)
|
||||
assert.Equal(t, "ok", payload["status"])
|
||||
}
|
||||
|
||||
func TestDirect_ApiCall_Good_POSTWithBody(t *testing.T) {
|
||||
|
|
@ -118,16 +121,19 @@ func TestDirect_ApiCall_Good_POSTWithBody(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
result, err := newTestDirect(srv).apiCall(context.Background(), "POST", "/v1/brain/remember", map[string]any{"content": "hello"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "mem-123", result["id"])
|
||||
result := newTestDirect(srv).apiCall(context.Background(), "POST", "/v1/brain/remember", map[string]any{"content": "hello"})
|
||||
require.True(t, result.OK)
|
||||
payload, _ := result.Value.(map[string]any)
|
||||
assert.Equal(t, "mem-123", payload["id"])
|
||||
}
|
||||
|
||||
func TestDirect_ApiCall_Bad_ServerError(t *testing.T) {
|
||||
srv := httptest.NewServer(errorHandler(http.StatusInternalServerError, `{"error":"internal"}`))
|
||||
defer srv.Close()
|
||||
|
||||
_, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
require.False(t, result.OK)
|
||||
err, _ := result.Value.(error)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "API call failed")
|
||||
}
|
||||
|
|
@ -139,14 +145,18 @@ func TestDirect_ApiCall_Bad_InvalidJSON(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
_, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
require.False(t, result.OK)
|
||||
err, _ := result.Value.(error)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "parse response")
|
||||
}
|
||||
|
||||
func TestDirect_ApiCall_Bad_ConnectionRefused(t *testing.T) {
|
||||
sub := &DirectSubsystem{apiURL: "http://127.0.0.1:1", apiKey: "test-key"}
|
||||
_, err := sub.apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
result := sub.apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
require.False(t, result.OK)
|
||||
err, _ := result.Value.(error)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "API call failed")
|
||||
}
|
||||
|
|
@ -155,7 +165,9 @@ func TestDirect_ApiCall_Bad_BadRequest(t *testing.T) {
|
|||
srv := httptest.NewServer(errorHandler(http.StatusBadRequest, `{"error":"bad input"}`))
|
||||
defer srv.Close()
|
||||
|
||||
_, err := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
result := newTestDirect(srv).apiCall(context.Background(), "GET", "/v1/test", nil)
|
||||
require.False(t, result.OK)
|
||||
err, _ := result.Value.(error)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "API call failed")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ func (s *DirectSubsystem) RegisterMessagingTools(server *mcp.Server) {
|
|||
|
||||
// Input/Output types
|
||||
|
||||
// SendInput sends a direct message to another agent.
|
||||
// SendInput is the payload for `agent_send`.
|
||||
//
|
||||
// brain.SendInput{To: "charon", Subject: "status update", Content: "deploy complete"}
|
||||
type SendInput struct {
|
||||
|
|
@ -42,7 +42,7 @@ type SendInput struct {
|
|||
Subject string `json:"subject,omitempty"`
|
||||
}
|
||||
|
||||
// SendOutput reports the created direct message.
|
||||
// SendOutput reports the stored direct message.
|
||||
//
|
||||
// brain.SendOutput{Success: true, ID: 42, To: "charon"}
|
||||
type SendOutput struct {
|
||||
|
|
@ -51,14 +51,14 @@ type SendOutput struct {
|
|||
To string `json:"to"`
|
||||
}
|
||||
|
||||
// InboxInput selects which agent inbox to read.
|
||||
// InboxInput selects which inbox `agent_inbox` should read.
|
||||
//
|
||||
// brain.InboxInput{Agent: "cladius"}
|
||||
type InboxInput struct {
|
||||
Agent string `json:"agent,omitempty"`
|
||||
}
|
||||
|
||||
// MessageItem is one inbox or conversation message.
|
||||
// MessageItem is one inbox or conversation entry.
|
||||
//
|
||||
// brain.MessageItem{ID: 7, From: "cladius", To: "charon", Content: "all green"}
|
||||
type MessageItem struct {
|
||||
|
|
@ -71,7 +71,7 @@ type MessageItem struct {
|
|||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// InboxOutput returns the latest direct messages for an agent.
|
||||
// InboxOutput returns the latest direct messages for one agent.
|
||||
//
|
||||
// brain.InboxOutput{Success: true, Messages: []brain.MessageItem{{ID: 1, From: "charon", To: "cladius"}}}
|
||||
type InboxOutput struct {
|
||||
|
|
@ -79,7 +79,7 @@ type InboxOutput struct {
|
|||
Messages []MessageItem `json:"messages"`
|
||||
}
|
||||
|
||||
// ConversationInput selects the agent thread to load.
|
||||
// ConversationInput selects the thread `agent_conversation` should load.
|
||||
//
|
||||
// brain.ConversationInput{Agent: "charon"}
|
||||
type ConversationInput struct {
|
||||
|
|
@ -101,17 +101,19 @@ func (s *DirectSubsystem) sendMessage(ctx context.Context, _ *mcp.CallToolReques
|
|||
return nil, SendOutput{}, core.E("brain.sendMessage", "to and content are required", nil)
|
||||
}
|
||||
|
||||
result, err := s.apiCall(ctx, "POST", "/v1/messages/send", map[string]any{
|
||||
result := s.apiCall(ctx, "POST", "/v1/messages/send", map[string]any{
|
||||
"to": input.To,
|
||||
"from": agentic.AgentName(),
|
||||
"content": input.Content,
|
||||
"subject": input.Subject,
|
||||
})
|
||||
if err != nil {
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return nil, SendOutput{}, err
|
||||
}
|
||||
|
||||
data, _ := result["data"].(map[string]any)
|
||||
payload, _ := result.Value.(map[string]any)
|
||||
data, _ := payload["data"].(map[string]any)
|
||||
id, _ := data["id"].(float64)
|
||||
|
||||
return nil, SendOutput{
|
||||
|
|
@ -127,14 +129,15 @@ func (s *DirectSubsystem) inbox(ctx context.Context, _ *mcp.CallToolRequest, inp
|
|||
agent = agentic.AgentName()
|
||||
}
|
||||
// Agent names are validated identifiers — no URL escaping needed.
|
||||
result, err := s.apiCall(ctx, "GET", core.Concat("/v1/messages/inbox?agent=", agent), nil)
|
||||
if err != nil {
|
||||
result := s.apiCall(ctx, "GET", core.Concat("/v1/messages/inbox?agent=", agent), nil)
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return nil, InboxOutput{}, err
|
||||
}
|
||||
|
||||
return nil, InboxOutput{
|
||||
Success: true,
|
||||
Messages: parseMessages(result),
|
||||
Messages: parseMessages(result.Value.(map[string]any)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -143,14 +146,15 @@ func (s *DirectSubsystem) conversation(ctx context.Context, _ *mcp.CallToolReque
|
|||
return nil, ConversationOutput{}, core.E("brain.conversation", "agent is required", nil)
|
||||
}
|
||||
|
||||
result, err := s.apiCall(ctx, "GET", core.Concat("/v1/messages/conversation/", input.Agent, "?me=", agentic.AgentName()), nil)
|
||||
if err != nil {
|
||||
result := s.apiCall(ctx, "GET", core.Concat("/v1/messages/conversation/", input.Agent, "?me=", agentic.AgentName()), nil)
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
return nil, ConversationOutput{}, err
|
||||
}
|
||||
|
||||
return nil, ConversationOutput{
|
||||
Success: true,
|
||||
Messages: parseMessages(result),
|
||||
Messages: parseMessages(result.Value.(map[string]any)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import (
|
|||
// BrainProvider exposes the same OpenBrain bridge over HTTP routes and WS events.
|
||||
//
|
||||
// provider := brain.NewProvider(bridge, hub)
|
||||
// provider.RegisterRoutes(router.Group("/api/brain"))
|
||||
// core.Println(provider.BasePath()) // "/api/brain"
|
||||
type BrainProvider struct {
|
||||
bridge *ide.Bridge
|
||||
hub *ws.Hub
|
||||
|
|
@ -39,7 +39,7 @@ const (
|
|||
// NewProvider builds the HTTP provider around the IDE bridge and WS hub.
|
||||
//
|
||||
// p := brain.NewProvider(bridge, hub)
|
||||
// p.RegisterRoutes(router.Group("/api/brain"))
|
||||
// core.Println(p.BasePath())
|
||||
func NewProvider(bridge *ide.Bridge, hub *ws.Hub) *BrainProvider {
|
||||
return &BrainProvider{
|
||||
bridge: bridge,
|
||||
|
|
@ -60,6 +60,7 @@ func (p *BrainProvider) BasePath() string { return "/api/brain" }
|
|||
// Channels lists the WS events emitted after brain actions complete.
|
||||
//
|
||||
// channels := p.Channels()
|
||||
// core.Println(channels[0]) // "brain.remember.complete"
|
||||
func (p *BrainProvider) Channels() []string {
|
||||
return []string{
|
||||
"brain.remember.complete",
|
||||
|
|
@ -71,6 +72,7 @@ func (p *BrainProvider) Channels() []string {
|
|||
// Element describes the browser component that renders the brain panel.
|
||||
//
|
||||
// spec := p.Element()
|
||||
// core.Println(spec.Tag) // "core-brain-panel"
|
||||
func (p *BrainProvider) Element() provider.ElementSpec {
|
||||
return provider.ElementSpec{
|
||||
Tag: "core-brain-panel",
|
||||
|
|
@ -92,6 +94,7 @@ func (p *BrainProvider) RegisterRoutes(rg *gin.RouterGroup) {
|
|||
// Describe returns the route contract used by API discovery and docs.
|
||||
//
|
||||
// routes := p.Describe()
|
||||
// core.Println(routes[0].Path) // "/remember"
|
||||
func (p *BrainProvider) Describe() []api.RouteDescription {
|
||||
return []api.RouteDescription{
|
||||
{
|
||||
|
|
|
|||
|
|
@ -6,10 +6,11 @@ import (
|
|||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
// Register exposes the direct OpenBrain subsystem through core.WithService.
|
||||
// Register exposes the direct OpenBrain subsystem through `core.WithService`.
|
||||
//
|
||||
// c := core.New(core.WithService(brain.Register))
|
||||
// sub, _ := core.ServiceFor[*brain.DirectSubsystem](c, "brain")
|
||||
// core.Println(sub.Name()) // "brain"
|
||||
func Register(c *core.Core) core.Result {
|
||||
brn := NewDirect()
|
||||
return core.Result{Value: brn, OK: true}
|
||||
|
|
|
|||
|
|
@ -26,9 +26,9 @@ func CoreRoot() string {
|
|||
return agentic.CoreRoot()
|
||||
}
|
||||
|
||||
// ReadStatus reads a workspace status.json.
|
||||
// ReadStatus reads `status.json` from one workspace directory.
|
||||
//
|
||||
// st, err := runner.ReadStatus("/path/to/workspace")
|
||||
// st, err := runner.ReadStatus("/srv/core/workspace/core/go-io/task-5")
|
||||
func ReadStatus(wsDir string) (*WorkspaceStatus, error) {
|
||||
status, err := agentic.ReadStatus(wsDir)
|
||||
if err != nil {
|
||||
|
|
@ -44,9 +44,9 @@ func ReadStatus(wsDir string) (*WorkspaceStatus, error) {
|
|||
return &st, nil
|
||||
}
|
||||
|
||||
// WriteStatus writes a workspace status.json.
|
||||
// WriteStatus writes `status.json` for one workspace directory.
|
||||
//
|
||||
// runner.WriteStatus(wsDir, &runner.WorkspaceStatus{Status: "running", Agent: "codex"})
|
||||
// runner.WriteStatus("/srv/core/workspace/core/go-io/task-5", &runner.WorkspaceStatus{Status: "running", Agent: "codex"})
|
||||
func WriteStatus(wsDir string, st *WorkspaceStatus) {
|
||||
if st == nil {
|
||||
return
|
||||
|
|
|
|||
|
|
@ -12,18 +12,22 @@ import (
|
|||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// DispatchConfig controls agent dispatch behaviour.
|
||||
// DispatchConfig mirrors the `dispatch:` block in `agents.yaml`.
|
||||
//
|
||||
// cfg := runner.DispatchConfig{DefaultAgent: "claude", DefaultTemplate: "coding"}
|
||||
// cfg := runner.DispatchConfig{
|
||||
// DefaultAgent: "codex", DefaultTemplate: "coding", WorkspaceRoot: "/srv/core/workspace",
|
||||
// }
|
||||
type DispatchConfig struct {
|
||||
DefaultAgent string `yaml:"default_agent"`
|
||||
DefaultTemplate string `yaml:"default_template"`
|
||||
WorkspaceRoot string `yaml:"workspace_root"`
|
||||
}
|
||||
|
||||
// RateConfig controls pacing between task dispatches.
|
||||
// RateConfig mirrors one agent pool under `rates:` in `agents.yaml`.
|
||||
//
|
||||
// rate := runner.RateConfig{ResetUTC: "06:00", SustainedDelay: 120}
|
||||
// rate := runner.RateConfig{
|
||||
// ResetUTC: "06:00", DailyLimit: 200, SustainedDelay: 120, BurstWindow: 2, BurstDelay: 300,
|
||||
// }
|
||||
type RateConfig struct {
|
||||
ResetUTC string `yaml:"reset_utc"`
|
||||
DailyLimit int `yaml:"daily_limit"`
|
||||
|
|
@ -68,7 +72,12 @@ func (c *ConcurrencyLimit) UnmarshalYAML(value *yaml.Node) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// AgentsConfig is the root of agents.yaml.
|
||||
// AgentsConfig mirrors the full `agents.yaml` file.
|
||||
//
|
||||
// cfg := runner.AgentsConfig{
|
||||
// Version: 1,
|
||||
// Dispatch: runner.DispatchConfig{DefaultAgent: "codex", DefaultTemplate: "coding"},
|
||||
// }
|
||||
type AgentsConfig struct {
|
||||
Version int `yaml:"version"`
|
||||
Dispatch DispatchConfig `yaml:"dispatch"`
|
||||
|
|
@ -76,7 +85,10 @@ type AgentsConfig struct {
|
|||
Rates map[string]RateConfig `yaml:"rates"`
|
||||
}
|
||||
|
||||
// loadAgentsConfig reads agents.yaml from known paths.
|
||||
// loadAgentsConfig reads `agents.yaml` from the Core root.
|
||||
//
|
||||
// cfg := s.loadAgentsConfig()
|
||||
// core.Println(cfg.Dispatch.DefaultAgent)
|
||||
func (s *Service) loadAgentsConfig() *AgentsConfig {
|
||||
paths := []string{
|
||||
core.JoinPath(CoreRoot(), "agents.yaml"),
|
||||
|
|
@ -165,7 +177,9 @@ func (s *Service) countRunningByAgent(agent string) int {
|
|||
return count
|
||||
}
|
||||
|
||||
// countRunningByModel counts running workspaces for a specific agent:model.
|
||||
// countRunningByModel counts running workspaces for a specific `agent:model`.
|
||||
//
|
||||
// n := s.countRunningByModel("codex:gpt-5.4")
|
||||
func (s *Service) countRunningByModel(agent string) int {
|
||||
count := 0
|
||||
s.workspaces.Each(func(_ string, st *WorkspaceStatus) {
|
||||
|
|
@ -182,7 +196,9 @@ func (s *Service) countRunningByModel(agent string) int {
|
|||
return count
|
||||
}
|
||||
|
||||
// drainQueue fills available concurrency slots from queued workspaces.
|
||||
// drainQueue fills any free concurrency slots from queued workspaces.
|
||||
//
|
||||
// s.drainQueue()
|
||||
func (s *Service) drainQueue() {
|
||||
if s.frozen {
|
||||
return
|
||||
|
|
@ -221,8 +237,8 @@ func (s *Service) drainOne() bool {
|
|||
continue
|
||||
}
|
||||
|
||||
// Ask agentic to spawn — runner doesn't own the spawn logic,
|
||||
// just the gate. Send IPC to trigger the actual spawn.
|
||||
// Ask agentic to spawn — runner owns the gate,
|
||||
// agentic owns the actual process launch.
|
||||
// Workspace name is relative path from workspace root (e.g. "core/go-ai/dev")
|
||||
wsName := agentic.WorkspaceName(wsDir)
|
||||
core.Info("drainOne: found queued workspace", "workspace", wsName, "agent", st.Agent)
|
||||
|
|
@ -232,7 +248,7 @@ func (s *Service) drainOne() bool {
|
|||
continue
|
||||
}
|
||||
type spawner interface {
|
||||
SpawnFromQueue(agent, prompt, wsDir string) (int, error)
|
||||
SpawnFromQueue(agent, prompt, wsDir string) core.Result
|
||||
}
|
||||
prep, ok := core.ServiceFor[spawner](s.Core(), "agentic")
|
||||
if !ok {
|
||||
|
|
@ -240,9 +256,14 @@ func (s *Service) drainOne() bool {
|
|||
continue
|
||||
}
|
||||
prompt := core.Concat("TASK: ", st.Task, "\n\nResume from where you left off. Read CODEX.md for conventions. Commit when done.")
|
||||
pid, err := prep.SpawnFromQueue(st.Agent, prompt, wsDir)
|
||||
if err != nil {
|
||||
core.Error("drainOne: spawn failed", "err", err)
|
||||
spawnResult := prep.SpawnFromQueue(st.Agent, prompt, wsDir)
|
||||
if !spawnResult.OK {
|
||||
core.Error("drainOne: spawn failed", "workspace", wsName, "reason", core.Sprint(spawnResult.Value))
|
||||
continue
|
||||
}
|
||||
pid, ok := spawnResult.Value.(int)
|
||||
if !ok {
|
||||
core.Error("drainOne: spawn returned non-int pid", "workspace", wsName)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue