feat(mcp/brain): OpenBrain T1+T2 — shared client + direct/brain-seed adoption (#175 #176)

#175 (T1/5 — shared Go HTTP client):
- pkg/mcp/brain/client/client.go: shared client with retry, circuit breaker, org propagation
- 30s default HTTP client, retry on network/5xx only, no retry on 4xx
- Minimal circuit breaker for repeated failures
- pkg/mcp/brain/client/client_test.go: httptest coverage

#176 (T2/5 — direct subsystem + binaries adopt shared client):
- pkg/mcp/brain/direct.go: refactored to delegate through *client.Client
- pkg/mcp/brain/direct_test.go: tests updated to use brainclient.New
- cmd/brain-seed/main.go: uses /v1/brain/remember via shared client with -org

Verification: go test ./pkg/mcp/brain/client/... passed; go build cmd/brain-seed passed.

Co-authored-by: Codex <noreply@openai.com>
Closes tasks.lthn.sh/view.php?id=175
Closes tasks.lthn.sh/view.php?id=176
This commit is contained in:
Snider 2026-04-25 15:42:05 +01:00
parent 2910a0d588
commit 44206708f9
5 changed files with 823 additions and 186 deletions

View file

@ -1,40 +1,38 @@
// SPDX-License-Identifier: EUPL-1.2
// brain-seed imports Claude Code MEMORY.md files into the OpenBrain knowledge
// store via the MCP HTTP API (brain_remember tool). The Laravel app handles
// store via the shared OpenBrain HTTP client. The Laravel app handles
// embedding, Qdrant storage, and MariaDB dual-write internally.
//
// Usage:
//
// go run ./cmd/brain-seed -api-key YOUR_KEY
// go run ./cmd/brain-seed -api-key YOUR_KEY -api https://lthn.sh/api/v1/mcp
// go run ./cmd/brain-seed -api-key YOUR_KEY -api https://api.lthn.sh
// go run ./cmd/brain-seed -api-key YOUR_KEY -dry-run
// go run ./cmd/brain-seed -api-key YOUR_KEY -plans
// go run ./cmd/brain-seed -api-key YOUR_KEY -claude-md # Also import CLAUDE.md files
package main
import (
"crypto/tls"
"encoding/json"
"context"
"flag"
goio "io"
"net/http"
"os"
"path/filepath"
"regexp"
"time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
brainclient "dappco.re/go/mcp/pkg/mcp/brain/client"
)
const seedDivider = "======================================================="
var (
apiURL = flag.String("api", "https://lthn.sh/api/v1/mcp", "MCP API base URL")
apiKey = flag.String("api-key", "", "MCP API key (Bearer token)")
server = flag.String("server", "hosthub-agent", "MCP server ID")
apiURL = flag.String("api", brainclient.DefaultURL, "OpenBrain API base URL")
apiKey = flag.String("api-key", core.Env("CORE_BRAIN_KEY"), "OpenBrain API key (Bearer token)")
server = flag.String("server", "hosthub-agent", "Legacy MCP server ID flag; accepted for compatibility")
org = flag.String("org", core.Env("CORE_BRAIN_ORG"), "OpenBrain org for seeded memories")
agent = flag.String("agent", "charon", "Agent ID for attribution")
dryRun = flag.Bool("dry-run", false, "Preview without storing")
plans = flag.Bool("plans", false, "Also import plan documents")
@ -45,19 +43,12 @@ var (
maxChars = flag.Int("max-chars", 3800, "Max chars per section (embeddinggemma limit ~4000)")
)
// httpClient with TLS skip for non-public TLDs (.lthn.sh has real certs, but
// allow .lan/.local if someone has legacy config).
var httpClient = &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: false},
},
}
var openbrain *brainclient.Client
func main() {
flag.Parse()
core.Println("OpenBrain Seed — MCP API Client")
core.Println("OpenBrain Seed — API Client")
core.Println(seedDivider)
if *apiKey == "" && !*dryRun {
@ -71,7 +62,14 @@ func main() {
}
core.Print(nil, "API: %s", *apiURL)
core.Print(nil, "Server: %s | Agent: %s", *server, *agent)
core.Print(nil, "Org: %s | Agent: %s", *org, *agent)
openbrain = brainclient.New(brainclient.Options{
URL: *apiURL,
Key: *apiKey,
Org: *org,
AgentID: *agent,
})
// Discover memory files
memPath := *memoryPath
@ -255,60 +253,30 @@ func main() {
core.Print(nil, "%sImported: %d | Skipped: %d | Errors: %d", prefix, imported, skipped, errors)
}
// callBrainRemember sends a memory to the MCP API via brain_remember tool.
// callBrainRemember sends a memory to OpenBrain via /v1/brain/remember.
func callBrainRemember(content, memType string, tags []string, project string, confidence float64) error {
args := map[string]any{
"content": content,
"type": memType,
"tags": tags,
"confidence": confidence,
if openbrain == nil {
openbrain = brainclient.New(brainclient.Options{
URL: *apiURL,
Key: *apiKey,
Org: *org,
AgentID: *agent,
})
}
input := brainclient.RememberInput{
Content: content,
Type: memType,
Tags: tags,
Org: *org,
AgentID: *agent,
Confidence: confidence,
}
if project != "" && project != "unknown" {
args["project"] = project
input.Project = project
}
payload := map[string]any{
"server": *server,
"tool": "brain_remember",
"arguments": args,
}
body, err := json.Marshal(payload)
if err != nil {
return coreerr.E("callBrainRemember", "marshal", err)
}
req, err := http.NewRequest("POST", *apiURL+"/tools/call", core.NewBuffer(body))
if err != nil {
return coreerr.E("callBrainRemember", "request", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+*apiKey)
resp, err := httpClient.Do(req)
if err != nil {
return coreerr.E("callBrainRemember", "http", err)
}
defer resp.Body.Close()
respBody, _ := goio.ReadAll(resp.Body)
if resp.StatusCode != 200 {
return coreerr.E("callBrainRemember", "HTTP "+string(respBody), nil)
}
var result struct {
Success bool `json:"success"`
Error string `json:"error"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return coreerr.E("callBrainRemember", "decode", err)
}
if !result.Success {
return coreerr.E("callBrainRemember", "API: "+result.Error, nil)
}
return nil
_, err := openbrain.Remember(context.Background(), input)
return coreerr.Wrap(err, "callBrainRemember", "remember")
}
// truncate caps content to maxLen chars, appending an ellipsis if truncated.

View file

@ -0,0 +1,500 @@
// SPDX-License-Identifier: EUPL-1.2
// Package client provides the shared OpenBrain HTTP client.
//
// c := client.New(client.Options{URL: core.Env("CORE_BRAIN_URL"), Key: core.Env("CORE_BRAIN_KEY")})
// _, err := c.Remember(ctx, client.RememberInput{
// Org: "core",
// Project: "mcp",
// Content: "Use one OpenBrain client for retry and circuit-breaker policy.",
// Type: "decision",
// })
package client
import (
"context"
"io"
"net/http"
"net/url"
"sync"
"time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
)
const (
DefaultURL = "https://api.lthn.sh"
defaultAgentID = "cladius"
defaultTimeout = 30 * time.Second
defaultMaxAttempts = 3
defaultBaseDelay = 100 * time.Millisecond
defaultFailureThreshold = 3
defaultSuccessThreshold = 1
defaultCircuitCooldown = 30 * time.Second
defaultMaxResponseBytes = int64(1 << 20)
defaultRecallTopK = 10
defaultListLimit = 50
)
// ErrCircuitOpen is returned when repeated upstream failures have opened the circuit.
var ErrCircuitOpen = core.NewError("brain client circuit open")
// Options configures the shared OpenBrain client.
type Options struct {
URL string
Key string
Org string
AgentID string
HTTPClient *http.Client
MaxAttempts int
BaseDelay time.Duration
MaxResponseBytes int64
CircuitBreaker *CircuitBreaker
}
// Client calls the Laravel /v1/brain/* API with shared retry and circuit policy.
type Client struct {
apiURL string
apiKey string
org string
agentID string
httpClient *http.Client
maxAttempts int
baseDelay time.Duration
maxResponseBytes int64
circuitBreaker *CircuitBreaker
}
// RememberInput is the request body for POST /v1/brain/remember.
type RememberInput struct {
Content string `json:"content"`
Type string `json:"type"`
Tags []string `json:"tags,omitempty"`
Org string `json:"org,omitempty"`
Project string `json:"project,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Confidence float64 `json:"confidence,omitempty"`
Supersedes string `json:"supersedes,omitempty"`
ExpiresIn int `json:"expires_in,omitempty"`
}
// RecallInput is the request body for POST /v1/brain/recall.
type RecallInput struct {
Query string `json:"query"`
TopK int `json:"top_k,omitempty"`
Org string `json:"org,omitempty"`
Project string `json:"project,omitempty"`
Type any `json:"type,omitempty"`
AgentID string `json:"agent_id,omitempty"`
MinConfidence float64 `json:"min_confidence,omitempty"`
}
// ForgetInput selects the memory removed by DELETE /v1/brain/forget/{id}.
type ForgetInput struct {
ID string `json:"id"`
Reason string `json:"reason,omitempty"`
}
// ListInput provides URL parameters for GET /v1/brain/list.
type ListInput struct {
Org string `json:"org,omitempty"`
Project string `json:"project,omitempty"`
Type string `json:"type,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Limit int `json:"limit,omitempty"`
}
// CircuitState is the current breaker state.
type CircuitState string
const (
CircuitClosed CircuitState = "closed"
CircuitOpen CircuitState = "open"
CircuitHalfOpen CircuitState = "half_open"
)
// CircuitBreakerOptions controls when the circuit opens and recovers.
type CircuitBreakerOptions struct {
FailureThreshold int
SuccessThreshold int
Cooldown time.Duration
}
// CircuitBreaker protects OpenBrain from repeated failed calls.
type CircuitBreaker struct {
lock sync.Mutex
state CircuitState
failureThreshold int
successThreshold int
cooldown time.Duration
consecutiveFails int
consecutiveWins int
openedAt time.Time
halfOpenInFlight bool
}
// New creates a shared OpenBrain client.
func New(options Options) *Client {
apiURL := core.Trim(options.URL)
if apiURL == "" {
apiURL = DefaultURL
}
agentID := core.Trim(options.AgentID)
if agentID == "" {
agentID = defaultAgentID
}
httpClient := options.HTTPClient
if httpClient == nil {
httpClient = &http.Client{Timeout: defaultTimeout}
}
maxAttempts := options.MaxAttempts
if maxAttempts <= 0 {
maxAttempts = defaultMaxAttempts
}
baseDelay := options.BaseDelay
if baseDelay <= 0 {
baseDelay = defaultBaseDelay
}
maxResponseBytes := options.MaxResponseBytes
if maxResponseBytes <= 0 {
maxResponseBytes = defaultMaxResponseBytes
}
breaker := options.CircuitBreaker
if breaker == nil {
breaker = NewCircuitBreaker(CircuitBreakerOptions{})
}
return &Client{
apiURL: core.TrimSuffix(apiURL, "/"),
apiKey: core.Trim(options.Key),
org: core.Trim(options.Org),
agentID: agentID,
httpClient: httpClient,
maxAttempts: maxAttempts,
baseDelay: baseDelay,
maxResponseBytes: maxResponseBytes,
circuitBreaker: breaker,
}
}
// NewFromEnvironment reads CORE_BRAIN_* settings and ~/.claude/brain.key.
func NewFromEnvironment() *Client {
return New(Options{
URL: envOr("CORE_BRAIN_URL", DefaultURL),
Key: apiKeyFromEnvironment(),
Org: core.Env("CORE_BRAIN_ORG"),
AgentID: core.Env("CORE_BRAIN_AGENT_ID"),
})
}
// NewCircuitBreaker creates a circuit breaker with OpenBrain defaults.
func NewCircuitBreaker(options CircuitBreakerOptions) *CircuitBreaker {
failureThreshold := options.FailureThreshold
if failureThreshold <= 0 {
failureThreshold = defaultFailureThreshold
}
successThreshold := options.SuccessThreshold
if successThreshold <= 0 {
successThreshold = defaultSuccessThreshold
}
cooldown := options.Cooldown
if cooldown <= 0 {
cooldown = defaultCircuitCooldown
}
return &CircuitBreaker{
state: CircuitClosed,
failureThreshold: failureThreshold,
successThreshold: successThreshold,
cooldown: cooldown,
}
}
// State returns the current breaker state.
func (breaker *CircuitBreaker) State() CircuitState {
if breaker == nil {
return CircuitClosed
}
breaker.lock.Lock()
defer breaker.lock.Unlock()
return breaker.stateNow(time.Now())
}
// Remember stores a memory in OpenBrain.
func (c *Client) Remember(ctx context.Context, input RememberInput) (map[string]any, error) {
input.Org = c.orgFor(input.Org)
input.AgentID = c.agentFor(input.AgentID)
return c.Call(ctx, http.MethodPost, "/v1/brain/remember", input)
}
// Recall searches memories in OpenBrain.
func (c *Client) Recall(ctx context.Context, input RecallInput) (map[string]any, error) {
input.Org = c.orgFor(input.Org)
input.AgentID = c.agentFor(input.AgentID)
if input.TopK == 0 {
input.TopK = defaultRecallTopK
}
return c.Call(ctx, http.MethodPost, "/v1/brain/recall", input)
}
// Forget removes one memory from OpenBrain.
func (c *Client) Forget(ctx context.Context, input ForgetInput) (map[string]any, error) {
return c.Call(ctx, http.MethodDelete, core.Concat("/v1/brain/forget/", url.PathEscape(input.ID)), nil)
}
// List returns memories from OpenBrain using URL query filters.
func (c *Client) List(ctx context.Context, input ListInput) (map[string]any, error) {
input.Org = c.orgFor(input.Org)
if input.Limit == 0 {
input.Limit = defaultListLimit
}
values := url.Values{}
if input.Org != "" {
values.Set("org", input.Org)
}
if input.Project != "" {
values.Set("project", input.Project)
}
if input.Type != "" {
values.Set("type", input.Type)
}
if input.AgentID != "" {
values.Set("agent_id", input.AgentID)
}
values.Set("limit", core.Sprintf("%d", input.Limit))
return c.Call(ctx, http.MethodGet, core.Concat("/v1/brain/list?", values.Encode()), nil)
}
// Call performs one OpenBrain API request through retry and circuit-breaker policy.
func (c *Client) Call(ctx context.Context, method, path string, body any) (map[string]any, error) {
if c.apiKey == "" {
return nil, core.E("brain.client", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil)
}
if err := c.circuitBreaker.beforeRequest(); err != nil {
return nil, err
}
bodyString := ""
if body != nil {
bodyString = core.JSONMarshalString(body)
}
var lastErr error
for attempt := 1; attempt <= c.maxAttempts; attempt++ {
payload, retryable, err := c.doOnce(ctx, method, path, bodyString, body != nil)
if err == nil {
c.circuitBreaker.recordSuccess()
return payload, nil
}
lastErr = err
if !retryable {
c.circuitBreaker.recordIgnored()
break
}
c.circuitBreaker.recordFailure()
if c.circuitBreaker.State() == CircuitOpen || attempt == c.maxAttempts {
break
}
if sleepErr := c.sleep(ctx, attempt); sleepErr != nil {
lastErr = sleepErr
break
}
}
return nil, lastErr
}
func (c *Client) doOnce(ctx context.Context, method, path, bodyString string, hasBody bool) (map[string]any, bool, error) {
var reader io.Reader
if hasBody {
reader = core.NewReader(bodyString)
}
request, err := http.NewRequestWithContext(ctx, method, c.requestURL(path), reader)
if err != nil {
return nil, false, core.E("brain.client", "create request", err)
}
request.Header.Set("Accept", "application/json")
request.Header.Set("Authorization", core.Concat("Bearer ", c.apiKey))
if hasBody {
request.Header.Set("Content-Type", "application/json")
}
response, err := c.httpClient.Do(request)
if err != nil {
if ctx.Err() != nil {
return nil, false, core.E("brain.client", "request cancelled", ctx.Err())
}
return nil, true, core.E("brain.client", "request failed", err)
}
defer response.Body.Close()
readResult := core.ReadAll(io.LimitReader(response.Body, c.maxResponseBytes+1))
if !readResult.OK {
if readErr, ok := readResult.Value.(error); ok {
return nil, false, core.E("brain.client", "read response", readErr)
}
return nil, false, core.E("brain.client", "read response", nil)
}
raw := readResult.Value.(string)
if int64(len(raw)) > c.maxResponseBytes {
return nil, false, core.E("brain.client", "response too large", nil)
}
if response.StatusCode >= http.StatusBadRequest {
return nil, retryableStatus(response.StatusCode), core.E("brain.client", core.Concat("upstream returned ", response.Status, ": ", core.Trim(raw)), nil)
}
result := map[string]any{}
if parseResult := core.JSONUnmarshalString(raw, &result); !parseResult.OK {
if parseErr, ok := parseResult.Value.(error); ok {
return nil, false, core.E("brain.client", "parse response", parseErr)
}
return nil, false, core.E("brain.client", "parse response", nil)
}
return result, false, nil
}
func (c *Client) requestURL(path string) string {
if core.HasPrefix(path, "http://") || core.HasPrefix(path, "https://") {
return path
}
if !core.HasPrefix(path, "/") {
path = core.Concat("/", path)
}
return core.Concat(c.apiURL, path)
}
func (c *Client) sleep(ctx context.Context, attempt int) error {
delay := c.baseDelay
for i := 1; i < attempt; i++ {
delay *= 3
}
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return core.E("brain.client", "request cancelled", ctx.Err())
case <-timer.C:
return nil
}
}
func (c *Client) orgFor(org string) string {
org = core.Trim(org)
if org != "" {
return org
}
return c.org
}
func (c *Client) agentFor(agentID string) string {
agentID = core.Trim(agentID)
if agentID != "" {
return agentID
}
return c.agentID
}
func (breaker *CircuitBreaker) beforeRequest() error {
if breaker == nil {
return nil
}
breaker.lock.Lock()
defer breaker.lock.Unlock()
state := breaker.stateNow(time.Now())
if state == CircuitOpen {
return ErrCircuitOpen
}
if state == CircuitHalfOpen {
if breaker.halfOpenInFlight {
return ErrCircuitOpen
}
breaker.halfOpenInFlight = true
}
return nil
}
func (breaker *CircuitBreaker) recordSuccess() {
if breaker == nil {
return
}
breaker.lock.Lock()
defer breaker.lock.Unlock()
breaker.halfOpenInFlight = false
breaker.consecutiveFails = 0
breaker.consecutiveWins++
if breaker.state == CircuitHalfOpen && breaker.consecutiveWins >= breaker.successThreshold {
breaker.state = CircuitClosed
breaker.consecutiveWins = 0
}
if breaker.state == CircuitClosed {
breaker.consecutiveWins = 0
}
}
func (breaker *CircuitBreaker) recordFailure() {
if breaker == nil {
return
}
breaker.lock.Lock()
defer breaker.lock.Unlock()
breaker.halfOpenInFlight = false
breaker.consecutiveWins = 0
breaker.consecutiveFails++
if breaker.state == CircuitHalfOpen || breaker.consecutiveFails >= breaker.failureThreshold {
breaker.state = CircuitOpen
breaker.openedAt = time.Now()
}
}
func (breaker *CircuitBreaker) recordIgnored() {
if breaker == nil {
return
}
breaker.lock.Lock()
defer breaker.lock.Unlock()
breaker.halfOpenInFlight = false
}
func (breaker *CircuitBreaker) stateNow(now time.Time) CircuitState {
if breaker.state == "" {
breaker.state = CircuitClosed
}
if breaker.state == CircuitOpen && now.Sub(breaker.openedAt) >= breaker.cooldown {
breaker.state = CircuitHalfOpen
breaker.consecutiveFails = 0
breaker.consecutiveWins = 0
breaker.halfOpenInFlight = false
}
return breaker.state
}
func retryableStatus(statusCode int) bool {
return statusCode >= http.StatusInternalServerError
}
func envOr(key, fallback string) string {
value := core.Env(key)
if value != "" {
return value
}
return fallback
}
func apiKeyFromEnvironment() string {
if apiKey := core.Trim(core.Env("CORE_BRAIN_KEY")); apiKey != "" {
return apiKey
}
home := core.Env("HOME")
if home == "" {
return ""
}
if data, err := coreio.Local.Read(core.JoinPath(home, ".claude", "brain.key")); err == nil {
return core.Trim(data)
}
return ""
}

View file

@ -0,0 +1,218 @@
// SPDX-License-Identifier: EUPL-1.2
package client
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
core "dappco.re/go/core"
)
func TestClientRemember_Good_SendsOrgAndAuth(t *testing.T) {
var gotBody map[string]any
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
t.Fatalf("expected POST, got %s", r.Method)
}
if r.URL.Path != "/v1/brain/remember" {
t.Fatalf("expected /v1/brain/remember, got %s", r.URL.Path)
}
if r.Header.Get("Authorization") != "Bearer test-key" {
t.Fatalf("expected bearer token, got %q", r.Header.Get("Authorization"))
}
gotBody = readRequestBody(t, r)
writeJSON(t, w, http.StatusOK, map[string]any{"id": "mem-1"})
}))
defer server.Close()
c := New(Options{
URL: server.URL,
Key: "test-key",
Org: "core",
AgentID: "codex",
HTTPClient: server.Client(),
MaxAttempts: 1,
})
result, err := c.Remember(context.Background(), RememberInput{
Content: "remember org",
Type: "decision",
Project: "mcp",
})
if err != nil {
t.Fatalf("Remember failed: %v", err)
}
if result["id"] != "mem-1" {
t.Fatalf("expected id mem-1, got %v", result["id"])
}
if gotBody["org"] != "core" {
t.Fatalf("expected org=core, got %v", gotBody["org"])
}
if gotBody["project"] != "mcp" {
t.Fatalf("expected project=mcp, got %v", gotBody["project"])
}
if gotBody["agent_id"] != "codex" {
t.Fatalf("expected agent_id=codex, got %v", gotBody["agent_id"])
}
}
func TestClientList_Good_SendsOrgURLParam(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Fatalf("expected GET, got %s", r.Method)
}
if r.URL.Path != "/v1/brain/list" {
t.Fatalf("expected /v1/brain/list, got %s", r.URL.Path)
}
if got := r.URL.Query().Get("org"); got != "core" {
t.Fatalf("expected org=core, got %q", got)
}
if got := r.URL.Query().Get("project"); got != "mcp" {
t.Fatalf("expected project=mcp, got %q", got)
}
if got := r.URL.Query().Get("limit"); got != "50" {
t.Fatalf("expected default limit=50, got %q", got)
}
writeJSON(t, w, http.StatusOK, map[string]any{"memories": []any{}})
}))
defer server.Close()
c := New(Options{URL: server.URL, Key: "test-key", Org: "core", HTTPClient: server.Client(), MaxAttempts: 1})
if _, err := c.List(context.Background(), ListInput{Project: "mcp"}); err != nil {
t.Fatalf("List failed: %v", err)
}
}
func TestClientCall_Good_Retries503ThenSucceeds(t *testing.T) {
attempts := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
if attempts == 1 {
writeJSON(t, w, http.StatusServiceUnavailable, map[string]any{"error": "down"})
return
}
writeJSON(t, w, http.StatusOK, map[string]any{"memories": []any{}})
}))
defer server.Close()
c := New(Options{
URL: server.URL,
Key: "test-key",
HTTPClient: server.Client(),
MaxAttempts: 3,
BaseDelay: time.Nanosecond,
})
if _, err := c.Recall(context.Background(), RecallInput{Query: "retry"}); err != nil {
t.Fatalf("Recall failed after retry: %v", err)
}
if attempts != 2 {
t.Fatalf("expected 2 attempts, got %d", attempts)
}
}
func TestClientCall_Bad_DoesNotRetry400(t *testing.T) {
attempts := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
writeJSON(t, w, http.StatusBadRequest, map[string]any{"error": "bad request"})
}))
defer server.Close()
c := New(Options{
URL: server.URL,
Key: "test-key",
HTTPClient: server.Client(),
MaxAttempts: 3,
BaseDelay: time.Nanosecond,
})
if _, err := c.Recall(context.Background(), RecallInput{Query: "bad"}); err == nil {
t.Fatal("expected 400 error")
}
if attempts != 1 {
t.Fatalf("expected one attempt for 400, got %d", attempts)
}
}
func TestClientCall_Bad_Continuous503OpensCircuit(t *testing.T) {
attempts := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
writeJSON(t, w, http.StatusServiceUnavailable, map[string]any{"error": "down"})
}))
defer server.Close()
breaker := NewCircuitBreaker(CircuitBreakerOptions{
FailureThreshold: 3,
SuccessThreshold: 1,
Cooldown: time.Hour,
})
c := New(Options{
URL: server.URL,
Key: "test-key",
HTTPClient: server.Client(),
MaxAttempts: 3,
BaseDelay: time.Nanosecond,
CircuitBreaker: breaker,
})
if _, err := c.Recall(context.Background(), RecallInput{Query: "down"}); err == nil {
t.Fatal("expected 503 error")
}
if breaker.State() != CircuitOpen {
t.Fatalf("expected circuit open, got %s", breaker.State())
}
if _, err := c.Recall(context.Background(), RecallInput{Query: "down"}); !core.Is(err, ErrCircuitOpen) {
t.Fatalf("expected ErrCircuitOpen, got %v", err)
}
if attempts != 3 {
t.Fatalf("expected no network attempt after circuit open, got %d attempts", attempts)
}
}
func TestClientCall_Bad_ContextCancellation(t *testing.T) {
attempts := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
writeJSON(t, w, http.StatusOK, map[string]any{"ok": true})
}))
defer server.Close()
c := New(Options{URL: server.URL, Key: "test-key", HTTPClient: server.Client(), MaxAttempts: 3})
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := c.Recall(ctx, RecallInput{Query: "cancelled"}); !core.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled, got %v", err)
}
if attempts != 0 {
t.Fatalf("expected cancelled request to avoid network, got %d attempts", attempts)
}
}
func readRequestBody(t *testing.T, r *http.Request) map[string]any {
t.Helper()
readResult := core.ReadAll(r.Body)
if !readResult.OK {
t.Fatalf("failed to read body: %v", readResult.Value)
}
body := map[string]any{}
if decodeResult := core.JSONUnmarshalString(readResult.Value.(string), &body); !decodeResult.OK {
t.Fatalf("failed to decode body: %v", decodeResult.Value)
}
return body
}
func writeJSON(t *testing.T, w http.ResponseWriter, status int, payload any) {
t.Helper()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
if _, err := w.Write([]byte(core.JSONMarshalString(payload))); err != nil {
t.Fatalf("failed to write response: %v", err)
}
}

View file

@ -4,14 +4,11 @@ package brain
import (
"context"
"net/http"
"net/url"
"time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp"
brainclient "dappco.re/go/mcp/pkg/mcp/brain/client"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
@ -24,9 +21,7 @@ type channelSender func(ctx context.Context, channel string, data any)
// Unlike Subsystem (which uses the IDE WebSocket bridge), this calls the
// Laravel API directly — suitable for standalone core-mcp usage.
type DirectSubsystem struct {
apiURL string
apiKey string
client *http.Client
apiClient *brainclient.Client
onChannel channelSender
}
@ -53,24 +48,17 @@ func (s *DirectSubsystem) OnChannel(fn func(ctx context.Context, channel string,
// Reads CORE_BRAIN_URL and CORE_BRAIN_KEY from environment, or falls back
// to ~/.claude/brain.key for the API key.
func NewDirect() *DirectSubsystem {
apiURL := core.Env("CORE_BRAIN_URL")
if apiURL == "" {
apiURL = "https://api.lthn.sh"
}
return NewDirectWithClient(brainclient.NewFromEnvironment())
}
apiKey := core.Env("CORE_BRAIN_KEY")
if apiKey == "" {
home := core.Env("HOME")
if data, err := coreio.Local.Read(core.Path(home, ".claude", "brain.key")); err == nil {
apiKey = core.Trim(data)
}
}
return &DirectSubsystem{
apiURL: apiURL,
apiKey: apiKey,
client: &http.Client{Timeout: 30 * time.Second},
// NewDirectWithClient creates a direct brain subsystem using the shared client.
//
// brain := NewDirectWithClient(client.New(client.Options{URL: "http://127.0.0.1:8080", Key: "test"}))
func NewDirectWithClient(apiClient *brainclient.Client) *DirectSubsystem {
if apiClient == nil {
apiClient = brainclient.NewFromEnvironment()
}
return &DirectSubsystem{apiClient: apiClient}
}
// Name implements mcp.Subsystem.
@ -104,58 +92,19 @@ func (s *DirectSubsystem) RegisterTools(svc *coremcp.Service) {
func (s *DirectSubsystem) Shutdown(_ context.Context) error { return nil }
func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body any) (map[string]any, error) {
if s.apiKey == "" {
return nil, coreerr.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil)
}
var bodyStr string
if body != nil {
bodyStr = core.JSONMarshalString(body)
}
req, err := http.NewRequestWithContext(ctx, method, s.apiURL+path, core.NewReader(bodyStr))
if err != nil {
return nil, coreerr.E("brain.apiCall", "create request", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", "Bearer "+s.apiKey)
resp, err := s.client.Do(req)
if err != nil {
return nil, coreerr.E("brain.apiCall", "API call failed", err)
}
defer resp.Body.Close()
r := core.ReadAll(resp.Body)
if !r.OK {
if readErr, ok := r.Value.(error); ok {
return nil, coreerr.E("brain.apiCall", "read response", readErr)
}
return nil, coreerr.E("brain.apiCall", "read response failed", nil)
}
respData := r.Value.(string)
if resp.StatusCode >= 400 {
return nil, coreerr.E("brain.apiCall", "API returned "+respData, nil)
}
var result map[string]any
if ur := core.JSONUnmarshal([]byte(respData), &result); !ur.OK {
return nil, coreerr.E("brain.apiCall", "parse response", nil)
}
return result, nil
return s.client().Call(ctx, method, path, body)
}
func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) {
result, err := s.apiCall(ctx, "POST", "/v1/brain/remember", map[string]any{
"content": input.Content,
"type": input.Type,
"tags": input.Tags,
"org": input.Org,
"project": input.Project,
"agent_id": "cladius",
result, err := s.client().Remember(ctx, brainclient.RememberInput{
Content: input.Content,
Type: input.Type,
Tags: input.Tags,
Org: input.Org,
Project: input.Project,
Confidence: input.Confidence,
Supersedes: input.Supersedes,
ExpiresIn: input.ExpiresIn,
})
if err != nil {
return nil, RememberOutput{}, err
@ -178,25 +127,15 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest,
}
func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, input RecallInput) (*mcp.CallToolResult, RecallOutput, error) {
body := map[string]any{
"query": input.Query,
"top_k": input.TopK,
"agent_id": "cladius",
}
if input.Filter.Project != "" {
body["project"] = input.Filter.Project
}
if input.Filter.Org != "" {
body["org"] = input.Filter.Org
}
if input.Filter.Type != nil {
body["type"] = input.Filter.Type
}
if input.TopK == 0 {
body["top_k"] = 10
}
result, err := s.apiCall(ctx, "POST", "/v1/brain/recall", body)
result, err := s.client().Recall(ctx, brainclient.RecallInput{
Query: input.Query,
TopK: input.TopK,
Org: input.Filter.Org,
Project: input.Filter.Project,
Type: input.Filter.Type,
AgentID: input.Filter.AgentID,
MinConfidence: input.Filter.MinConfidence,
})
if err != nil {
return nil, RecallOutput{}, err
}
@ -219,7 +158,7 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in
}
func (s *DirectSubsystem) forget(ctx context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) {
_, err := s.apiCall(ctx, "DELETE", "/v1/brain/forget/"+input.ID, nil)
_, err := s.client().Forget(ctx, brainclient.ForgetInput{ID: input.ID, Reason: input.Reason})
if err != nil {
return nil, ForgetOutput{}, err
}
@ -243,23 +182,13 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu
if limit == 0 {
limit = 50
}
values := url.Values{}
if input.Org != "" {
values.Set("org", input.Org)
}
if input.Project != "" {
values.Set("project", input.Project)
}
if input.Type != "" {
values.Set("type", input.Type)
}
if input.AgentID != "" {
values.Set("agent_id", input.AgentID)
}
values.Set("limit", core.Sprintf("%d", limit))
result, err := s.apiCall(ctx, http.MethodGet, "/v1/brain/list?"+values.Encode(), nil)
result, err := s.client().List(ctx, brainclient.ListInput{
Org: input.Org,
Project: input.Project,
Type: input.Type,
AgentID: input.AgentID,
Limit: limit,
})
if err != nil {
return nil, ListOutput{}, err
}
@ -283,6 +212,13 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu
}, nil
}
func (s *DirectSubsystem) client() *brainclient.Client {
if s.apiClient == nil {
s.apiClient = brainclient.NewFromEnvironment()
}
return s.apiClient
}
// memoriesFromResult extracts Memory entries from an API response map.
func memoriesFromResult(result map[string]any) []Memory {
var memories []Memory

View file

@ -8,14 +8,21 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
brainclient "dappco.re/go/mcp/pkg/mcp/brain/client"
)
// newTestDirect creates a DirectSubsystem pointing at a test server.
func newTestDirect(url string) *DirectSubsystem {
return &DirectSubsystem{
apiURL: url,
apiKey: "test-key",
client: http.DefaultClient,
apiClient: brainclient.New(brainclient.Options{
URL: url,
Key: "test-key",
HTTPClient: http.DefaultClient,
MaxAttempts: 1,
BaseDelay: time.Nanosecond,
}),
}
}
@ -84,7 +91,12 @@ func TestApiCall_Good_GetNilBody(t *testing.T) {
}
func TestApiCall_Bad_NoApiKey(t *testing.T) {
s := &DirectSubsystem{apiKey: "", client: http.DefaultClient}
s := &DirectSubsystem{apiClient: brainclient.New(brainclient.Options{
URL: "http://example.test",
Key: "",
HTTPClient: http.DefaultClient,
MaxAttempts: 1,
})}
_, err := s.apiCall(context.Background(), "GET", "/test", nil)
if err == nil {
t.Error("expected error when apiKey is empty")
@ -121,9 +133,12 @@ func TestApiCall_Bad_InvalidJson(t *testing.T) {
func TestApiCall_Bad_Unreachable(t *testing.T) {
s := &DirectSubsystem{
apiURL: "http://127.0.0.1:1", // nothing listening
apiKey: "key",
client: http.DefaultClient,
apiClient: brainclient.New(brainclient.Options{
URL: "http://127.0.0.1:1", // nothing listening
Key: "key",
HTTPClient: http.DefaultClient,
MaxAttempts: 1,
}),
}
_, err := s.apiCall(context.Background(), "GET", "/test", nil)
if err == nil {