feat(agent/brain): adopt shared T1 client + propagate org through actions (#177)

#177 (T3/5 — direct subsystem adopts shared client):
- pkg/brain/direct.go: HTTP transport now delegates to shared T1 client
  in core/mcp's pkg/mcp/brain/client (retry, circuit breaker, org propagation)
- pkg/brain/actions.go: org now survives from action options through
  remember/recall/list calls
- pkg/brain/direct_test.go + actions_test.go: tests updated for org propagation

Tickets deferred:
- #179 (T5/5 — cross-runtime contract test + BRAIN-CALLERS.md): needs
  cross-repo edits to mcp + external runtime consumers
- #180 (lift RFC-OPENBRAIN features into vendored BrainService):
  base schema lacks memory_scope; no agentBoot, brain:consolidate,
  agent-context endpoint, or lifecycle events present

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=177
This commit is contained in:
Snider 2026-04-25 16:22:36 +01:00
parent 6832d40587
commit 6be6cb095c
4 changed files with 92 additions and 41 deletions

View file

@ -42,6 +42,7 @@ func (s *DirectSubsystem) handleRemember(ctx context.Context, options core.Optio
Content: actionStringValue(options, "content"),
Type: actionStringValue(options, "type"),
Tags: actionStringSliceValue(options, "tags"),
Org: actionStringValue(options, "org"),
Project: actionStringValue(options, "project"),
Confidence: actionFloatValue(options, "confidence"),
Supersedes: actionStringValue(options, "supersedes"),
@ -98,6 +99,7 @@ func (s *DirectSubsystem) handleForget(ctx context.Context, options core.Options
// ))
func (s *DirectSubsystem) handleList(ctx context.Context, options core.Options) core.Result {
input := ListInput{
Org: actionStringValue(options, "org"),
Project: actionStringValue(options, "project"),
Type: actionStringValue(options, "type"),
AgentID: actionStringValue(options, "agent_id", "agent"),
@ -163,6 +165,9 @@ func (s *DirectSubsystem) handleConversation(ctx context.Context, options core.O
func recallFilterFromOptions(options core.Options) RecallFilter {
filter := recallFilterValue(actionOptionValue(options, "filter"))
if filter.Org == "" {
filter.Org = actionStringValue(options, "org")
}
if filter.Project == "" {
filter.Project = actionStringValue(options, "project")
}
@ -187,6 +192,7 @@ func recallFilterValue(value any) RecallFilter {
Project: actionStringFromAny(typed["project"]),
Type: typed["type"],
AgentID: actionStringFromAny(typed["agent_id"]),
Org: actionStringFromAny(typed["org"]),
MinConfidence: actionFloatFromAny(typed["min_confidence"]),
}
case map[string]string:
@ -194,6 +200,7 @@ func recallFilterValue(value any) RecallFilter {
Project: actionStringFromAny(typed["project"]),
Type: typed["type"],
AgentID: actionStringFromAny(typed["agent_id"]),
Org: actionStringFromAny(typed["org"]),
}
default:
if text := actionStringFromAny(value); text != "" {

View file

@ -37,6 +37,7 @@ func TestActions_HandleList_Good(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "GET", r.Method)
assert.Equal(t, "/v1/brain/list", r.URL.Path)
assert.Equal(t, "core", r.URL.Query().Get("org"))
assert.Equal(t, "agent", r.URL.Query().Get("project"))
assert.Equal(t, "decision", r.URL.Query().Get("type"))
assert.Equal(t, "cladius", r.URL.Query().Get("agent_id"))
@ -69,6 +70,7 @@ func TestActions_HandleList_Good(t *testing.T) {
require.True(t, result.OK)
actionResult := c.Action("brain.list").Run(context.Background(), core.NewOptions(
core.Option{Key: "org", Value: "core"},
core.Option{Key: "project", Value: "agent"},
core.Option{Key: "type", Value: "decision"},
core.Option{Key: "agent_id", Value: "cladius"},
@ -103,7 +105,7 @@ func TestActions_HandleList_Bad(t *testing.T) {
require.False(t, actionResult.OK)
err, ok := actionResult.Value.(error)
require.True(t, ok)
assert.Contains(t, err.Error(), "API call failed")
assert.Contains(t, err.Error(), "upstream returned 500")
}
func TestActions_HandleRecall_Ugly_FilterMap(t *testing.T) {
@ -115,6 +117,7 @@ func TestActions_HandleRecall_Ugly_FilterMap(t *testing.T) {
require.True(t, core.JSONUnmarshalString(core.ReadAll(r.Body).Value.(string), &body).OK)
assert.Equal(t, "architecture", body["query"])
assert.Equal(t, float64(3), body["top_k"])
assert.Equal(t, "core", body["org"])
assert.Equal(t, "agent", body["project"])
assert.Equal(t, "decision", body["type"])
assert.Equal(t, "clotho", body["agent_id"])
@ -136,6 +139,7 @@ func TestActions_HandleRecall_Ugly_FilterMap(t *testing.T) {
core.Option{Key: "query", Value: "architecture"},
core.Option{Key: "top_k", Value: 3},
core.Option{Key: "filter", Value: map[string]any{
"org": "core",
"project": "agent",
"type": "decision",
"agent_id": "clotho",

View file

@ -9,6 +9,7 @@ import (
"dappco.re/go/agent/pkg/agentic"
core "dappco.re/go/core"
coremcp "dappco.re/go/mcp/pkg/mcp"
brainclient "dappco.re/go/mcp/pkg/mcp/brain/client"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
@ -16,8 +17,9 @@ import (
// core.Println(subsystem.Name()) // "brain"
type DirectSubsystem struct {
*core.ServiceRuntime[DirectOptions]
apiURL string
apiKey string
apiURL string
apiKey string
apiClient *brainclient.Client
}
var _ coremcp.Subsystem = (*DirectSubsystem)(nil)
@ -48,8 +50,9 @@ func NewDirect() *DirectSubsystem {
}
return &DirectSubsystem{
apiURL: apiURL,
apiKey: apiKey,
apiURL: apiURL,
apiKey: apiKey,
apiClient: newBrainClient(apiURL, apiKey),
}
}
@ -93,37 +96,10 @@ func brainKeyPath(home string) string {
}
func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body any) core.Result {
if s.apiKey == "" {
return core.Result{
Value: core.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil),
OK: false,
}
result, err := s.client().Call(ctx, method, path, body)
if err != nil {
return core.Result{Value: err, OK: false}
}
requestURL := core.Concat(s.apiURL, path)
var bodyStr string
if body != nil {
bodyStr = core.JSONMarshalString(body)
}
requestResult := agentic.HTTPDo(ctx, method, requestURL, bodyStr, s.apiKey, "Bearer")
if !requestResult.OK {
core.Error("brain API call failed", "method", method, "path", path)
if err, ok := requestResult.Value.(error); ok {
return core.Result{Value: core.E("brain.apiCall", "API call failed", err), OK: false}
}
if responseBody, ok := requestResult.Value.(string); ok && responseBody != "" {
return core.Result{Value: core.E("brain.apiCall", core.Concat("API call failed: ", core.Trim(responseBody)), nil), OK: false}
}
return core.Result{Value: core.E("brain.apiCall", "API call failed", nil), OK: false}
}
var result map[string]any
if parseResult := core.JSONUnmarshalString(requestResult.Value.(string), &result); !parseResult.OK {
core.Error("brain API response parse failed", "method", method, "path", path)
err, _ := parseResult.Value.(error)
return core.Result{Value: core.E("brain.apiCall", "parse response", err), OK: false}
}
return core.Result{Value: result, OK: true}
}
@ -132,11 +108,12 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest,
"content": input.Content,
"type": input.Type,
"tags": input.Tags,
"org": input.Org,
"project": input.Project,
"confidence": input.Confidence,
"supersedes": input.Supersedes,
"expires_in": input.ExpiresIn,
"agent_id": agentic.AgentName(),
"agent_id": directAgentID(),
})
if !result.OK {
err, _ := result.Value.(error)
@ -162,6 +139,9 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in
if input.Filter.Project != "" {
body["project"] = input.Filter.Project
}
if input.Filter.Org != "" {
body["org"] = input.Filter.Org
}
if input.Filter.Type != nil {
body["type"] = input.Filter.Type
}
@ -204,6 +184,9 @@ func (s *DirectSubsystem) forget(ctx context.Context, _ *mcp.CallToolRequest, in
func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, input ListInput) (*mcp.CallToolResult, ListOutput, error) {
var params []string
if input.Org != "" {
params = append(params, core.Concat("org=", core.URLEncode(input.Org)))
}
if input.Project != "" {
params = append(params, core.Concat("project=", core.URLEncode(input.Project)))
}
@ -238,6 +221,29 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu
}, nil
}
func (s *DirectSubsystem) client() *brainclient.Client {
if s.apiClient == nil {
s.apiClient = newBrainClient(s.apiURL, s.apiKey)
}
return s.apiClient
}
func newBrainClient(apiURL, apiKey string) *brainclient.Client {
return brainclient.New(brainclient.Options{
URL: apiURL,
Key: apiKey,
Org: core.Trim(core.Env("CORE_BRAIN_ORG")),
AgentID: directAgentID(),
})
}
func directAgentID() string {
if configured := core.Trim(core.Env("CORE_BRAIN_AGENT_ID")); configured != "" {
return configured
}
return agentic.AgentName()
}
func memoriesFromPayload(payload map[string]any) []Memory {
var memories []Memory
source := payloadMap(payload)

View file

@ -7,15 +7,27 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
core "dappco.re/go/core"
brainclient "dappco.re/go/mcp/pkg/mcp/brain/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// newTestDirect returns a DirectSubsystem wired to the given test server.
func newTestDirect(srv *httptest.Server) *DirectSubsystem {
return &DirectSubsystem{apiURL: srv.URL, apiKey: "test-key"}
return &DirectSubsystem{
apiURL: srv.URL,
apiKey: "test-key",
apiClient: brainclient.New(brainclient.Options{
URL: srv.URL,
Key: "test-key",
HTTPClient: srv.Client(),
MaxAttempts: 1,
BaseDelay: time.Nanosecond,
}),
}
}
// jsonHandler returns an http.Handler that responds with the given JSON payload.
@ -151,7 +163,7 @@ func TestDirect_ApiCall_Bad_ServerError(t *testing.T) {
require.False(t, result.OK)
err, _ := result.Value.(error)
require.Error(t, err)
assert.Contains(t, err.Error(), "API call failed")
assert.Contains(t, err.Error(), "upstream returned 500")
}
func TestDirect_ApiCall_Bad_InvalidJSON(t *testing.T) {
@ -169,12 +181,21 @@ func TestDirect_ApiCall_Bad_InvalidJSON(t *testing.T) {
}
func TestDirect_ApiCall_Bad_ConnectionRefused(t *testing.T) {
sub := &DirectSubsystem{apiURL: "http://127.0.0.1:1", apiKey: "test-key"}
sub := &DirectSubsystem{
apiURL: "http://127.0.0.1:1",
apiKey: "test-key",
apiClient: brainclient.New(brainclient.Options{
URL: "http://127.0.0.1:1",
Key: "test-key",
MaxAttempts: 1,
BaseDelay: time.Nanosecond,
}),
}
result := sub.apiCall(context.Background(), "GET", "/v1/test", nil)
require.False(t, result.OK)
err, _ := result.Value.(error)
require.Error(t, err)
assert.Contains(t, err.Error(), "API call failed")
assert.Contains(t, err.Error(), "request failed")
}
func TestDirect_ApiCall_Bad_BadRequest(t *testing.T) {
@ -185,12 +206,14 @@ func TestDirect_ApiCall_Bad_BadRequest(t *testing.T) {
require.False(t, result.OK)
err, _ := result.Value.(error)
require.Error(t, err)
assert.Contains(t, err.Error(), "API call failed")
assert.Contains(t, err.Error(), "upstream returned 400")
}
// --- remember ---
func TestDirect_Remember_Good(t *testing.T) {
t.Setenv("CORE_BRAIN_AGENT_ID", "codex")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "POST", r.Method)
assert.Equal(t, "/v1/brain/remember", r.URL.Path)
@ -199,6 +222,8 @@ func TestDirect_Remember_Good(t *testing.T) {
core.JSONUnmarshalString(core.ReadAll(r.Body).Value.(string), &body)
assert.Equal(t, "test content", body["content"])
assert.Equal(t, "observation", body["type"])
assert.Equal(t, "core", body["org"])
assert.Equal(t, "codex", body["agent_id"])
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(core.JSONMarshalString(map[string]any{
@ -211,6 +236,7 @@ func TestDirect_Remember_Good(t *testing.T) {
Content: "test content",
Type: "observation",
Tags: []string{"test"},
Org: "core",
Project: "core",
})
require.NoError(t, err)
@ -254,6 +280,7 @@ func TestDirect_Recall_Good_WithMemories(t *testing.T) {
var body map[string]any
core.JSONUnmarshalString(core.ReadAll(r.Body).Value.(string), &body)
assert.Equal(t, "architecture", body["query"])
assert.Equal(t, "core", body["org"])
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(core.JSONMarshalString(map[string]any{
@ -287,6 +314,9 @@ func TestDirect_Recall_Good_WithMemories(t *testing.T) {
_, out, err := newTestDirect(srv).recall(context.Background(), nil, RecallInput{
Query: "architecture",
TopK: 5,
Filter: RecallFilter{
Org: "core",
},
})
require.NoError(t, err)
assert.True(t, out.Success)
@ -331,6 +361,7 @@ func TestDirect_Recall_Good_WithFilters(t *testing.T) {
var body map[string]any
core.JSONUnmarshalString(core.ReadAll(r.Body).Value.(string), &body)
assert.Equal(t, "cladius", body["agent_id"])
assert.Equal(t, "core", body["org"])
assert.Equal(t, "eaas", body["project"])
assert.Equal(t, "decision", body["type"])
@ -346,6 +377,7 @@ func TestDirect_Recall_Good_WithFilters(t *testing.T) {
TopK: 5,
Filter: RecallFilter{
AgentID: "cladius",
Org: "core",
Project: "eaas",
Type: "decision",
},
@ -412,6 +444,7 @@ func TestDirect_List_Good_WithMemories(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "GET", r.Method)
assert.Equal(t, "/v1/brain/list", r.URL.Path)
assert.Equal(t, "core", r.URL.Query().Get("org"))
assert.Equal(t, "agent", r.URL.Query().Get("project"))
assert.Equal(t, "decision", r.URL.Query().Get("type"))
assert.Equal(t, "codex", r.URL.Query().Get("agent_id"))
@ -453,6 +486,7 @@ func TestDirect_List_Good_WithMemories(t *testing.T) {
defer srv.Close()
_, out, err := newTestDirect(srv).list(context.Background(), nil, ListInput{
Org: "core",
Project: "agent",
Type: "decision",
AgentID: "codex",