From 44206708f9b486fa6c74b337dce43145b9e29694 Mon Sep 17 00:00:00 2001 From: Snider Date: Sat, 25 Apr 2026 15:42:05 +0100 Subject: [PATCH] =?UTF-8?q?feat(mcp/brain):=20OpenBrain=20T1+T2=20?= =?UTF-8?q?=E2=80=94=20shared=20client=20+=20direct/brain-seed=20adoption?= =?UTF-8?q?=20(#175=20#176)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #175 (T1/5 — shared Go HTTP client): - pkg/mcp/brain/client/client.go: shared client with retry, circuit breaker, org propagation - 30s default HTTP client, retry on network/5xx only, no retry on 4xx - Minimal circuit breaker for repeated failures - pkg/mcp/brain/client/client_test.go: httptest coverage #176 (T2/5 — direct subsystem + binaries adopt shared client): - pkg/mcp/brain/direct.go: refactored to delegate through *client.Client - pkg/mcp/brain/direct_test.go: tests updated to use brainclient.New - cmd/brain-seed/main.go: uses /v1/brain/remember via shared client with -org Verification: go test ./pkg/mcp/brain/client/... passed; go build cmd/brain-seed passed. Co-authored-by: Codex Closes tasks.lthn.sh/view.php?id=175 Closes tasks.lthn.sh/view.php?id=176 --- cmd/brain-seed/main.go | 108 +++--- pkg/mcp/brain/client/client.go | 500 ++++++++++++++++++++++++++++ pkg/mcp/brain/client/client_test.go | 218 ++++++++++++ pkg/mcp/brain/direct.go | 154 +++------ pkg/mcp/brain/direct_test.go | 29 +- 5 files changed, 823 insertions(+), 186 deletions(-) create mode 100644 pkg/mcp/brain/client/client.go create mode 100644 pkg/mcp/brain/client/client_test.go diff --git a/cmd/brain-seed/main.go b/cmd/brain-seed/main.go index 6079f82..a8edb26 100644 --- a/cmd/brain-seed/main.go +++ b/cmd/brain-seed/main.go @@ -1,40 +1,38 @@ // SPDX-License-Identifier: EUPL-1.2 // brain-seed imports Claude Code MEMORY.md files into the OpenBrain knowledge -// store via the MCP HTTP API (brain_remember tool). The Laravel app handles +// store via the shared OpenBrain HTTP client. The Laravel app handles // embedding, Qdrant storage, and MariaDB dual-write internally. // // Usage: // // go run ./cmd/brain-seed -api-key YOUR_KEY -// go run ./cmd/brain-seed -api-key YOUR_KEY -api https://lthn.sh/api/v1/mcp +// go run ./cmd/brain-seed -api-key YOUR_KEY -api https://api.lthn.sh // go run ./cmd/brain-seed -api-key YOUR_KEY -dry-run // go run ./cmd/brain-seed -api-key YOUR_KEY -plans // go run ./cmd/brain-seed -api-key YOUR_KEY -claude-md # Also import CLAUDE.md files package main import ( - "crypto/tls" - "encoding/json" + "context" "flag" - goio "io" - "net/http" "os" "path/filepath" "regexp" - "time" core "dappco.re/go/core" coreio "dappco.re/go/core/io" coreerr "dappco.re/go/core/log" + brainclient "dappco.re/go/mcp/pkg/mcp/brain/client" ) const seedDivider = "=======================================================" var ( - apiURL = flag.String("api", "https://lthn.sh/api/v1/mcp", "MCP API base URL") - apiKey = flag.String("api-key", "", "MCP API key (Bearer token)") - server = flag.String("server", "hosthub-agent", "MCP server ID") + apiURL = flag.String("api", brainclient.DefaultURL, "OpenBrain API base URL") + apiKey = flag.String("api-key", core.Env("CORE_BRAIN_KEY"), "OpenBrain API key (Bearer token)") + server = flag.String("server", "hosthub-agent", "Legacy MCP server ID flag; accepted for compatibility") + org = flag.String("org", core.Env("CORE_BRAIN_ORG"), "OpenBrain org for seeded memories") agent = flag.String("agent", "charon", "Agent ID for attribution") dryRun = flag.Bool("dry-run", false, "Preview without storing") plans = flag.Bool("plans", false, "Also import plan documents") @@ -45,19 +43,12 @@ var ( maxChars = flag.Int("max-chars", 3800, "Max chars per section (embeddinggemma limit ~4000)") ) -// httpClient with TLS skip for non-public TLDs (.lthn.sh has real certs, but -// allow .lan/.local if someone has legacy config). -var httpClient = &http.Client{ - Timeout: 30 * time.Second, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: false}, - }, -} +var openbrain *brainclient.Client func main() { flag.Parse() - core.Println("OpenBrain Seed — MCP API Client") + core.Println("OpenBrain Seed — API Client") core.Println(seedDivider) if *apiKey == "" && !*dryRun { @@ -71,7 +62,14 @@ func main() { } core.Print(nil, "API: %s", *apiURL) - core.Print(nil, "Server: %s | Agent: %s", *server, *agent) + core.Print(nil, "Org: %s | Agent: %s", *org, *agent) + + openbrain = brainclient.New(brainclient.Options{ + URL: *apiURL, + Key: *apiKey, + Org: *org, + AgentID: *agent, + }) // Discover memory files memPath := *memoryPath @@ -255,60 +253,30 @@ func main() { core.Print(nil, "%sImported: %d | Skipped: %d | Errors: %d", prefix, imported, skipped, errors) } -// callBrainRemember sends a memory to the MCP API via brain_remember tool. +// callBrainRemember sends a memory to OpenBrain via /v1/brain/remember. func callBrainRemember(content, memType string, tags []string, project string, confidence float64) error { - args := map[string]any{ - "content": content, - "type": memType, - "tags": tags, - "confidence": confidence, + if openbrain == nil { + openbrain = brainclient.New(brainclient.Options{ + URL: *apiURL, + Key: *apiKey, + Org: *org, + AgentID: *agent, + }) + } + + input := brainclient.RememberInput{ + Content: content, + Type: memType, + Tags: tags, + Org: *org, + AgentID: *agent, + Confidence: confidence, } if project != "" && project != "unknown" { - args["project"] = project + input.Project = project } - - payload := map[string]any{ - "server": *server, - "tool": "brain_remember", - "arguments": args, - } - - body, err := json.Marshal(payload) - if err != nil { - return coreerr.E("callBrainRemember", "marshal", err) - } - - req, err := http.NewRequest("POST", *apiURL+"/tools/call", core.NewBuffer(body)) - if err != nil { - return coreerr.E("callBrainRemember", "request", err) - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+*apiKey) - - resp, err := httpClient.Do(req) - if err != nil { - return coreerr.E("callBrainRemember", "http", err) - } - defer resp.Body.Close() - - respBody, _ := goio.ReadAll(resp.Body) - - if resp.StatusCode != 200 { - return coreerr.E("callBrainRemember", "HTTP "+string(respBody), nil) - } - - var result struct { - Success bool `json:"success"` - Error string `json:"error"` - } - if err := json.Unmarshal(respBody, &result); err != nil { - return coreerr.E("callBrainRemember", "decode", err) - } - if !result.Success { - return coreerr.E("callBrainRemember", "API: "+result.Error, nil) - } - - return nil + _, err := openbrain.Remember(context.Background(), input) + return coreerr.Wrap(err, "callBrainRemember", "remember") } // truncate caps content to maxLen chars, appending an ellipsis if truncated. diff --git a/pkg/mcp/brain/client/client.go b/pkg/mcp/brain/client/client.go new file mode 100644 index 0000000..d9038e0 --- /dev/null +++ b/pkg/mcp/brain/client/client.go @@ -0,0 +1,500 @@ +// SPDX-License-Identifier: EUPL-1.2 + +// Package client provides the shared OpenBrain HTTP client. +// +// c := client.New(client.Options{URL: core.Env("CORE_BRAIN_URL"), Key: core.Env("CORE_BRAIN_KEY")}) +// _, err := c.Remember(ctx, client.RememberInput{ +// Org: "core", +// Project: "mcp", +// Content: "Use one OpenBrain client for retry and circuit-breaker policy.", +// Type: "decision", +// }) +package client + +import ( + "context" + "io" + "net/http" + "net/url" + "sync" + "time" + + core "dappco.re/go/core" + coreio "dappco.re/go/core/io" +) + +const ( + DefaultURL = "https://api.lthn.sh" + defaultAgentID = "cladius" + defaultTimeout = 30 * time.Second + defaultMaxAttempts = 3 + defaultBaseDelay = 100 * time.Millisecond + defaultFailureThreshold = 3 + defaultSuccessThreshold = 1 + defaultCircuitCooldown = 30 * time.Second + defaultMaxResponseBytes = int64(1 << 20) + defaultRecallTopK = 10 + defaultListLimit = 50 +) + +// ErrCircuitOpen is returned when repeated upstream failures have opened the circuit. +var ErrCircuitOpen = core.NewError("brain client circuit open") + +// Options configures the shared OpenBrain client. +type Options struct { + URL string + Key string + Org string + AgentID string + HTTPClient *http.Client + MaxAttempts int + BaseDelay time.Duration + MaxResponseBytes int64 + CircuitBreaker *CircuitBreaker +} + +// Client calls the Laravel /v1/brain/* API with shared retry and circuit policy. +type Client struct { + apiURL string + apiKey string + org string + agentID string + httpClient *http.Client + maxAttempts int + baseDelay time.Duration + maxResponseBytes int64 + circuitBreaker *CircuitBreaker +} + +// RememberInput is the request body for POST /v1/brain/remember. +type RememberInput struct { + Content string `json:"content"` + Type string `json:"type"` + Tags []string `json:"tags,omitempty"` + Org string `json:"org,omitempty"` + Project string `json:"project,omitempty"` + AgentID string `json:"agent_id,omitempty"` + Confidence float64 `json:"confidence,omitempty"` + Supersedes string `json:"supersedes,omitempty"` + ExpiresIn int `json:"expires_in,omitempty"` +} + +// RecallInput is the request body for POST /v1/brain/recall. +type RecallInput struct { + Query string `json:"query"` + TopK int `json:"top_k,omitempty"` + Org string `json:"org,omitempty"` + Project string `json:"project,omitempty"` + Type any `json:"type,omitempty"` + AgentID string `json:"agent_id,omitempty"` + MinConfidence float64 `json:"min_confidence,omitempty"` +} + +// ForgetInput selects the memory removed by DELETE /v1/brain/forget/{id}. +type ForgetInput struct { + ID string `json:"id"` + Reason string `json:"reason,omitempty"` +} + +// ListInput provides URL parameters for GET /v1/brain/list. +type ListInput struct { + Org string `json:"org,omitempty"` + Project string `json:"project,omitempty"` + Type string `json:"type,omitempty"` + AgentID string `json:"agent_id,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// CircuitState is the current breaker state. +type CircuitState string + +const ( + CircuitClosed CircuitState = "closed" + CircuitOpen CircuitState = "open" + CircuitHalfOpen CircuitState = "half_open" +) + +// CircuitBreakerOptions controls when the circuit opens and recovers. +type CircuitBreakerOptions struct { + FailureThreshold int + SuccessThreshold int + Cooldown time.Duration +} + +// CircuitBreaker protects OpenBrain from repeated failed calls. +type CircuitBreaker struct { + lock sync.Mutex + state CircuitState + failureThreshold int + successThreshold int + cooldown time.Duration + consecutiveFails int + consecutiveWins int + openedAt time.Time + halfOpenInFlight bool +} + +// New creates a shared OpenBrain client. +func New(options Options) *Client { + apiURL := core.Trim(options.URL) + if apiURL == "" { + apiURL = DefaultURL + } + agentID := core.Trim(options.AgentID) + if agentID == "" { + agentID = defaultAgentID + } + httpClient := options.HTTPClient + if httpClient == nil { + httpClient = &http.Client{Timeout: defaultTimeout} + } + maxAttempts := options.MaxAttempts + if maxAttempts <= 0 { + maxAttempts = defaultMaxAttempts + } + baseDelay := options.BaseDelay + if baseDelay <= 0 { + baseDelay = defaultBaseDelay + } + maxResponseBytes := options.MaxResponseBytes + if maxResponseBytes <= 0 { + maxResponseBytes = defaultMaxResponseBytes + } + breaker := options.CircuitBreaker + if breaker == nil { + breaker = NewCircuitBreaker(CircuitBreakerOptions{}) + } + + return &Client{ + apiURL: core.TrimSuffix(apiURL, "/"), + apiKey: core.Trim(options.Key), + org: core.Trim(options.Org), + agentID: agentID, + httpClient: httpClient, + maxAttempts: maxAttempts, + baseDelay: baseDelay, + maxResponseBytes: maxResponseBytes, + circuitBreaker: breaker, + } +} + +// NewFromEnvironment reads CORE_BRAIN_* settings and ~/.claude/brain.key. +func NewFromEnvironment() *Client { + return New(Options{ + URL: envOr("CORE_BRAIN_URL", DefaultURL), + Key: apiKeyFromEnvironment(), + Org: core.Env("CORE_BRAIN_ORG"), + AgentID: core.Env("CORE_BRAIN_AGENT_ID"), + }) +} + +// NewCircuitBreaker creates a circuit breaker with OpenBrain defaults. +func NewCircuitBreaker(options CircuitBreakerOptions) *CircuitBreaker { + failureThreshold := options.FailureThreshold + if failureThreshold <= 0 { + failureThreshold = defaultFailureThreshold + } + successThreshold := options.SuccessThreshold + if successThreshold <= 0 { + successThreshold = defaultSuccessThreshold + } + cooldown := options.Cooldown + if cooldown <= 0 { + cooldown = defaultCircuitCooldown + } + return &CircuitBreaker{ + state: CircuitClosed, + failureThreshold: failureThreshold, + successThreshold: successThreshold, + cooldown: cooldown, + } +} + +// State returns the current breaker state. +func (breaker *CircuitBreaker) State() CircuitState { + if breaker == nil { + return CircuitClosed + } + breaker.lock.Lock() + defer breaker.lock.Unlock() + return breaker.stateNow(time.Now()) +} + +// Remember stores a memory in OpenBrain. +func (c *Client) Remember(ctx context.Context, input RememberInput) (map[string]any, error) { + input.Org = c.orgFor(input.Org) + input.AgentID = c.agentFor(input.AgentID) + return c.Call(ctx, http.MethodPost, "/v1/brain/remember", input) +} + +// Recall searches memories in OpenBrain. +func (c *Client) Recall(ctx context.Context, input RecallInput) (map[string]any, error) { + input.Org = c.orgFor(input.Org) + input.AgentID = c.agentFor(input.AgentID) + if input.TopK == 0 { + input.TopK = defaultRecallTopK + } + return c.Call(ctx, http.MethodPost, "/v1/brain/recall", input) +} + +// Forget removes one memory from OpenBrain. +func (c *Client) Forget(ctx context.Context, input ForgetInput) (map[string]any, error) { + return c.Call(ctx, http.MethodDelete, core.Concat("/v1/brain/forget/", url.PathEscape(input.ID)), nil) +} + +// List returns memories from OpenBrain using URL query filters. +func (c *Client) List(ctx context.Context, input ListInput) (map[string]any, error) { + input.Org = c.orgFor(input.Org) + if input.Limit == 0 { + input.Limit = defaultListLimit + } + values := url.Values{} + if input.Org != "" { + values.Set("org", input.Org) + } + if input.Project != "" { + values.Set("project", input.Project) + } + if input.Type != "" { + values.Set("type", input.Type) + } + if input.AgentID != "" { + values.Set("agent_id", input.AgentID) + } + values.Set("limit", core.Sprintf("%d", input.Limit)) + return c.Call(ctx, http.MethodGet, core.Concat("/v1/brain/list?", values.Encode()), nil) +} + +// Call performs one OpenBrain API request through retry and circuit-breaker policy. +func (c *Client) Call(ctx context.Context, method, path string, body any) (map[string]any, error) { + if c.apiKey == "" { + return nil, core.E("brain.client", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil) + } + if err := c.circuitBreaker.beforeRequest(); err != nil { + return nil, err + } + + bodyString := "" + if body != nil { + bodyString = core.JSONMarshalString(body) + } + + var lastErr error + for attempt := 1; attempt <= c.maxAttempts; attempt++ { + payload, retryable, err := c.doOnce(ctx, method, path, bodyString, body != nil) + if err == nil { + c.circuitBreaker.recordSuccess() + return payload, nil + } + + lastErr = err + if !retryable { + c.circuitBreaker.recordIgnored() + break + } + c.circuitBreaker.recordFailure() + if c.circuitBreaker.State() == CircuitOpen || attempt == c.maxAttempts { + break + } + if sleepErr := c.sleep(ctx, attempt); sleepErr != nil { + lastErr = sleepErr + break + } + } + + return nil, lastErr +} + +func (c *Client) doOnce(ctx context.Context, method, path, bodyString string, hasBody bool) (map[string]any, bool, error) { + var reader io.Reader + if hasBody { + reader = core.NewReader(bodyString) + } + request, err := http.NewRequestWithContext(ctx, method, c.requestURL(path), reader) + if err != nil { + return nil, false, core.E("brain.client", "create request", err) + } + request.Header.Set("Accept", "application/json") + request.Header.Set("Authorization", core.Concat("Bearer ", c.apiKey)) + if hasBody { + request.Header.Set("Content-Type", "application/json") + } + + response, err := c.httpClient.Do(request) + if err != nil { + if ctx.Err() != nil { + return nil, false, core.E("brain.client", "request cancelled", ctx.Err()) + } + return nil, true, core.E("brain.client", "request failed", err) + } + defer response.Body.Close() + + readResult := core.ReadAll(io.LimitReader(response.Body, c.maxResponseBytes+1)) + if !readResult.OK { + if readErr, ok := readResult.Value.(error); ok { + return nil, false, core.E("brain.client", "read response", readErr) + } + return nil, false, core.E("brain.client", "read response", nil) + } + raw := readResult.Value.(string) + if int64(len(raw)) > c.maxResponseBytes { + return nil, false, core.E("brain.client", "response too large", nil) + } + + if response.StatusCode >= http.StatusBadRequest { + return nil, retryableStatus(response.StatusCode), core.E("brain.client", core.Concat("upstream returned ", response.Status, ": ", core.Trim(raw)), nil) + } + + result := map[string]any{} + if parseResult := core.JSONUnmarshalString(raw, &result); !parseResult.OK { + if parseErr, ok := parseResult.Value.(error); ok { + return nil, false, core.E("brain.client", "parse response", parseErr) + } + return nil, false, core.E("brain.client", "parse response", nil) + } + return result, false, nil +} + +func (c *Client) requestURL(path string) string { + if core.HasPrefix(path, "http://") || core.HasPrefix(path, "https://") { + return path + } + if !core.HasPrefix(path, "/") { + path = core.Concat("/", path) + } + return core.Concat(c.apiURL, path) +} + +func (c *Client) sleep(ctx context.Context, attempt int) error { + delay := c.baseDelay + for i := 1; i < attempt; i++ { + delay *= 3 + } + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-ctx.Done(): + return core.E("brain.client", "request cancelled", ctx.Err()) + case <-timer.C: + return nil + } +} + +func (c *Client) orgFor(org string) string { + org = core.Trim(org) + if org != "" { + return org + } + return c.org +} + +func (c *Client) agentFor(agentID string) string { + agentID = core.Trim(agentID) + if agentID != "" { + return agentID + } + return c.agentID +} + +func (breaker *CircuitBreaker) beforeRequest() error { + if breaker == nil { + return nil + } + breaker.lock.Lock() + defer breaker.lock.Unlock() + + state := breaker.stateNow(time.Now()) + if state == CircuitOpen { + return ErrCircuitOpen + } + if state == CircuitHalfOpen { + if breaker.halfOpenInFlight { + return ErrCircuitOpen + } + breaker.halfOpenInFlight = true + } + return nil +} + +func (breaker *CircuitBreaker) recordSuccess() { + if breaker == nil { + return + } + breaker.lock.Lock() + defer breaker.lock.Unlock() + + breaker.halfOpenInFlight = false + breaker.consecutiveFails = 0 + breaker.consecutiveWins++ + if breaker.state == CircuitHalfOpen && breaker.consecutiveWins >= breaker.successThreshold { + breaker.state = CircuitClosed + breaker.consecutiveWins = 0 + } + if breaker.state == CircuitClosed { + breaker.consecutiveWins = 0 + } +} + +func (breaker *CircuitBreaker) recordFailure() { + if breaker == nil { + return + } + breaker.lock.Lock() + defer breaker.lock.Unlock() + + breaker.halfOpenInFlight = false + breaker.consecutiveWins = 0 + breaker.consecutiveFails++ + if breaker.state == CircuitHalfOpen || breaker.consecutiveFails >= breaker.failureThreshold { + breaker.state = CircuitOpen + breaker.openedAt = time.Now() + } +} + +func (breaker *CircuitBreaker) recordIgnored() { + if breaker == nil { + return + } + breaker.lock.Lock() + defer breaker.lock.Unlock() + breaker.halfOpenInFlight = false +} + +func (breaker *CircuitBreaker) stateNow(now time.Time) CircuitState { + if breaker.state == "" { + breaker.state = CircuitClosed + } + if breaker.state == CircuitOpen && now.Sub(breaker.openedAt) >= breaker.cooldown { + breaker.state = CircuitHalfOpen + breaker.consecutiveFails = 0 + breaker.consecutiveWins = 0 + breaker.halfOpenInFlight = false + } + return breaker.state +} + +func retryableStatus(statusCode int) bool { + return statusCode >= http.StatusInternalServerError +} + +func envOr(key, fallback string) string { + value := core.Env(key) + if value != "" { + return value + } + return fallback +} + +func apiKeyFromEnvironment() string { + if apiKey := core.Trim(core.Env("CORE_BRAIN_KEY")); apiKey != "" { + return apiKey + } + home := core.Env("HOME") + if home == "" { + return "" + } + if data, err := coreio.Local.Read(core.JoinPath(home, ".claude", "brain.key")); err == nil { + return core.Trim(data) + } + return "" +} diff --git a/pkg/mcp/brain/client/client_test.go b/pkg/mcp/brain/client/client_test.go new file mode 100644 index 0000000..d196477 --- /dev/null +++ b/pkg/mcp/brain/client/client_test.go @@ -0,0 +1,218 @@ +// SPDX-License-Identifier: EUPL-1.2 + +package client + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + core "dappco.re/go/core" +) + +func TestClientRemember_Good_SendsOrgAndAuth(t *testing.T) { + var gotBody map[string]any + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Fatalf("expected POST, got %s", r.Method) + } + if r.URL.Path != "/v1/brain/remember" { + t.Fatalf("expected /v1/brain/remember, got %s", r.URL.Path) + } + if r.Header.Get("Authorization") != "Bearer test-key" { + t.Fatalf("expected bearer token, got %q", r.Header.Get("Authorization")) + } + gotBody = readRequestBody(t, r) + writeJSON(t, w, http.StatusOK, map[string]any{"id": "mem-1"}) + })) + defer server.Close() + + c := New(Options{ + URL: server.URL, + Key: "test-key", + Org: "core", + AgentID: "codex", + HTTPClient: server.Client(), + MaxAttempts: 1, + }) + result, err := c.Remember(context.Background(), RememberInput{ + Content: "remember org", + Type: "decision", + Project: "mcp", + }) + if err != nil { + t.Fatalf("Remember failed: %v", err) + } + if result["id"] != "mem-1" { + t.Fatalf("expected id mem-1, got %v", result["id"]) + } + if gotBody["org"] != "core" { + t.Fatalf("expected org=core, got %v", gotBody["org"]) + } + if gotBody["project"] != "mcp" { + t.Fatalf("expected project=mcp, got %v", gotBody["project"]) + } + if gotBody["agent_id"] != "codex" { + t.Fatalf("expected agent_id=codex, got %v", gotBody["agent_id"]) + } +} + +func TestClientList_Good_SendsOrgURLParam(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + t.Fatalf("expected GET, got %s", r.Method) + } + if r.URL.Path != "/v1/brain/list" { + t.Fatalf("expected /v1/brain/list, got %s", r.URL.Path) + } + if got := r.URL.Query().Get("org"); got != "core" { + t.Fatalf("expected org=core, got %q", got) + } + if got := r.URL.Query().Get("project"); got != "mcp" { + t.Fatalf("expected project=mcp, got %q", got) + } + if got := r.URL.Query().Get("limit"); got != "50" { + t.Fatalf("expected default limit=50, got %q", got) + } + writeJSON(t, w, http.StatusOK, map[string]any{"memories": []any{}}) + })) + defer server.Close() + + c := New(Options{URL: server.URL, Key: "test-key", Org: "core", HTTPClient: server.Client(), MaxAttempts: 1}) + if _, err := c.List(context.Background(), ListInput{Project: "mcp"}); err != nil { + t.Fatalf("List failed: %v", err) + } +} + +func TestClientCall_Good_Retries503ThenSucceeds(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + if attempts == 1 { + writeJSON(t, w, http.StatusServiceUnavailable, map[string]any{"error": "down"}) + return + } + writeJSON(t, w, http.StatusOK, map[string]any{"memories": []any{}}) + })) + defer server.Close() + + c := New(Options{ + URL: server.URL, + Key: "test-key", + HTTPClient: server.Client(), + MaxAttempts: 3, + BaseDelay: time.Nanosecond, + }) + if _, err := c.Recall(context.Background(), RecallInput{Query: "retry"}); err != nil { + t.Fatalf("Recall failed after retry: %v", err) + } + if attempts != 2 { + t.Fatalf("expected 2 attempts, got %d", attempts) + } +} + +func TestClientCall_Bad_DoesNotRetry400(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + writeJSON(t, w, http.StatusBadRequest, map[string]any{"error": "bad request"}) + })) + defer server.Close() + + c := New(Options{ + URL: server.URL, + Key: "test-key", + HTTPClient: server.Client(), + MaxAttempts: 3, + BaseDelay: time.Nanosecond, + }) + if _, err := c.Recall(context.Background(), RecallInput{Query: "bad"}); err == nil { + t.Fatal("expected 400 error") + } + if attempts != 1 { + t.Fatalf("expected one attempt for 400, got %d", attempts) + } +} + +func TestClientCall_Bad_Continuous503OpensCircuit(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + writeJSON(t, w, http.StatusServiceUnavailable, map[string]any{"error": "down"}) + })) + defer server.Close() + + breaker := NewCircuitBreaker(CircuitBreakerOptions{ + FailureThreshold: 3, + SuccessThreshold: 1, + Cooldown: time.Hour, + }) + c := New(Options{ + URL: server.URL, + Key: "test-key", + HTTPClient: server.Client(), + MaxAttempts: 3, + BaseDelay: time.Nanosecond, + CircuitBreaker: breaker, + }) + + if _, err := c.Recall(context.Background(), RecallInput{Query: "down"}); err == nil { + t.Fatal("expected 503 error") + } + if breaker.State() != CircuitOpen { + t.Fatalf("expected circuit open, got %s", breaker.State()) + } + if _, err := c.Recall(context.Background(), RecallInput{Query: "down"}); !core.Is(err, ErrCircuitOpen) { + t.Fatalf("expected ErrCircuitOpen, got %v", err) + } + if attempts != 3 { + t.Fatalf("expected no network attempt after circuit open, got %d attempts", attempts) + } +} + +func TestClientCall_Bad_ContextCancellation(t *testing.T) { + attempts := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts++ + writeJSON(t, w, http.StatusOK, map[string]any{"ok": true}) + })) + defer server.Close() + + c := New(Options{URL: server.URL, Key: "test-key", HTTPClient: server.Client(), MaxAttempts: 3}) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + if _, err := c.Recall(ctx, RecallInput{Query: "cancelled"}); !core.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) + } + if attempts != 0 { + t.Fatalf("expected cancelled request to avoid network, got %d attempts", attempts) + } +} + +func readRequestBody(t *testing.T, r *http.Request) map[string]any { + t.Helper() + + readResult := core.ReadAll(r.Body) + if !readResult.OK { + t.Fatalf("failed to read body: %v", readResult.Value) + } + body := map[string]any{} + if decodeResult := core.JSONUnmarshalString(readResult.Value.(string), &body); !decodeResult.OK { + t.Fatalf("failed to decode body: %v", decodeResult.Value) + } + return body +} + +func writeJSON(t *testing.T, w http.ResponseWriter, status int, payload any) { + t.Helper() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if _, err := w.Write([]byte(core.JSONMarshalString(payload))); err != nil { + t.Fatalf("failed to write response: %v", err) + } +} diff --git a/pkg/mcp/brain/direct.go b/pkg/mcp/brain/direct.go index b031fbe..0d56418 100644 --- a/pkg/mcp/brain/direct.go +++ b/pkg/mcp/brain/direct.go @@ -4,14 +4,11 @@ package brain import ( "context" - "net/http" - "net/url" "time" core "dappco.re/go/core" - coreio "dappco.re/go/core/io" - coreerr "dappco.re/go/core/log" coremcp "dappco.re/go/mcp/pkg/mcp" + brainclient "dappco.re/go/mcp/pkg/mcp/brain/client" "github.com/modelcontextprotocol/go-sdk/mcp" ) @@ -24,9 +21,7 @@ type channelSender func(ctx context.Context, channel string, data any) // Unlike Subsystem (which uses the IDE WebSocket bridge), this calls the // Laravel API directly — suitable for standalone core-mcp usage. type DirectSubsystem struct { - apiURL string - apiKey string - client *http.Client + apiClient *brainclient.Client onChannel channelSender } @@ -53,24 +48,17 @@ func (s *DirectSubsystem) OnChannel(fn func(ctx context.Context, channel string, // Reads CORE_BRAIN_URL and CORE_BRAIN_KEY from environment, or falls back // to ~/.claude/brain.key for the API key. func NewDirect() *DirectSubsystem { - apiURL := core.Env("CORE_BRAIN_URL") - if apiURL == "" { - apiURL = "https://api.lthn.sh" - } + return NewDirectWithClient(brainclient.NewFromEnvironment()) +} - apiKey := core.Env("CORE_BRAIN_KEY") - if apiKey == "" { - home := core.Env("HOME") - if data, err := coreio.Local.Read(core.Path(home, ".claude", "brain.key")); err == nil { - apiKey = core.Trim(data) - } - } - - return &DirectSubsystem{ - apiURL: apiURL, - apiKey: apiKey, - client: &http.Client{Timeout: 30 * time.Second}, +// NewDirectWithClient creates a direct brain subsystem using the shared client. +// +// brain := NewDirectWithClient(client.New(client.Options{URL: "http://127.0.0.1:8080", Key: "test"})) +func NewDirectWithClient(apiClient *brainclient.Client) *DirectSubsystem { + if apiClient == nil { + apiClient = brainclient.NewFromEnvironment() } + return &DirectSubsystem{apiClient: apiClient} } // Name implements mcp.Subsystem. @@ -104,58 +92,19 @@ func (s *DirectSubsystem) RegisterTools(svc *coremcp.Service) { func (s *DirectSubsystem) Shutdown(_ context.Context) error { return nil } func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body any) (map[string]any, error) { - if s.apiKey == "" { - return nil, coreerr.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil) - } - - var bodyStr string - if body != nil { - bodyStr = core.JSONMarshalString(body) - } - - req, err := http.NewRequestWithContext(ctx, method, s.apiURL+path, core.NewReader(bodyStr)) - if err != nil { - return nil, coreerr.E("brain.apiCall", "create request", err) - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "application/json") - req.Header.Set("Authorization", "Bearer "+s.apiKey) - - resp, err := s.client.Do(req) - if err != nil { - return nil, coreerr.E("brain.apiCall", "API call failed", err) - } - defer resp.Body.Close() - - r := core.ReadAll(resp.Body) - if !r.OK { - if readErr, ok := r.Value.(error); ok { - return nil, coreerr.E("brain.apiCall", "read response", readErr) - } - return nil, coreerr.E("brain.apiCall", "read response failed", nil) - } - respData := r.Value.(string) - - if resp.StatusCode >= 400 { - return nil, coreerr.E("brain.apiCall", "API returned "+respData, nil) - } - - var result map[string]any - if ur := core.JSONUnmarshal([]byte(respData), &result); !ur.OK { - return nil, coreerr.E("brain.apiCall", "parse response", nil) - } - - return result, nil + return s.client().Call(ctx, method, path, body) } func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, input RememberInput) (*mcp.CallToolResult, RememberOutput, error) { - result, err := s.apiCall(ctx, "POST", "/v1/brain/remember", map[string]any{ - "content": input.Content, - "type": input.Type, - "tags": input.Tags, - "org": input.Org, - "project": input.Project, - "agent_id": "cladius", + result, err := s.client().Remember(ctx, brainclient.RememberInput{ + Content: input.Content, + Type: input.Type, + Tags: input.Tags, + Org: input.Org, + Project: input.Project, + Confidence: input.Confidence, + Supersedes: input.Supersedes, + ExpiresIn: input.ExpiresIn, }) if err != nil { return nil, RememberOutput{}, err @@ -178,25 +127,15 @@ func (s *DirectSubsystem) remember(ctx context.Context, _ *mcp.CallToolRequest, } func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, input RecallInput) (*mcp.CallToolResult, RecallOutput, error) { - body := map[string]any{ - "query": input.Query, - "top_k": input.TopK, - "agent_id": "cladius", - } - if input.Filter.Project != "" { - body["project"] = input.Filter.Project - } - if input.Filter.Org != "" { - body["org"] = input.Filter.Org - } - if input.Filter.Type != nil { - body["type"] = input.Filter.Type - } - if input.TopK == 0 { - body["top_k"] = 10 - } - - result, err := s.apiCall(ctx, "POST", "/v1/brain/recall", body) + result, err := s.client().Recall(ctx, brainclient.RecallInput{ + Query: input.Query, + TopK: input.TopK, + Org: input.Filter.Org, + Project: input.Filter.Project, + Type: input.Filter.Type, + AgentID: input.Filter.AgentID, + MinConfidence: input.Filter.MinConfidence, + }) if err != nil { return nil, RecallOutput{}, err } @@ -219,7 +158,7 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in } func (s *DirectSubsystem) forget(ctx context.Context, _ *mcp.CallToolRequest, input ForgetInput) (*mcp.CallToolResult, ForgetOutput, error) { - _, err := s.apiCall(ctx, "DELETE", "/v1/brain/forget/"+input.ID, nil) + _, err := s.client().Forget(ctx, brainclient.ForgetInput{ID: input.ID, Reason: input.Reason}) if err != nil { return nil, ForgetOutput{}, err } @@ -243,23 +182,13 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu if limit == 0 { limit = 50 } - - values := url.Values{} - if input.Org != "" { - values.Set("org", input.Org) - } - if input.Project != "" { - values.Set("project", input.Project) - } - if input.Type != "" { - values.Set("type", input.Type) - } - if input.AgentID != "" { - values.Set("agent_id", input.AgentID) - } - values.Set("limit", core.Sprintf("%d", limit)) - - result, err := s.apiCall(ctx, http.MethodGet, "/v1/brain/list?"+values.Encode(), nil) + result, err := s.client().List(ctx, brainclient.ListInput{ + Org: input.Org, + Project: input.Project, + Type: input.Type, + AgentID: input.AgentID, + Limit: limit, + }) if err != nil { return nil, ListOutput{}, err } @@ -283,6 +212,13 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu }, nil } +func (s *DirectSubsystem) client() *brainclient.Client { + if s.apiClient == nil { + s.apiClient = brainclient.NewFromEnvironment() + } + return s.apiClient +} + // memoriesFromResult extracts Memory entries from an API response map. func memoriesFromResult(result map[string]any) []Memory { var memories []Memory diff --git a/pkg/mcp/brain/direct_test.go b/pkg/mcp/brain/direct_test.go index 6e13682..f28464e 100644 --- a/pkg/mcp/brain/direct_test.go +++ b/pkg/mcp/brain/direct_test.go @@ -8,14 +8,21 @@ import ( "net/http" "net/http/httptest" "testing" + "time" + + brainclient "dappco.re/go/mcp/pkg/mcp/brain/client" ) // newTestDirect creates a DirectSubsystem pointing at a test server. func newTestDirect(url string) *DirectSubsystem { return &DirectSubsystem{ - apiURL: url, - apiKey: "test-key", - client: http.DefaultClient, + apiClient: brainclient.New(brainclient.Options{ + URL: url, + Key: "test-key", + HTTPClient: http.DefaultClient, + MaxAttempts: 1, + BaseDelay: time.Nanosecond, + }), } } @@ -84,7 +91,12 @@ func TestApiCall_Good_GetNilBody(t *testing.T) { } func TestApiCall_Bad_NoApiKey(t *testing.T) { - s := &DirectSubsystem{apiKey: "", client: http.DefaultClient} + s := &DirectSubsystem{apiClient: brainclient.New(brainclient.Options{ + URL: "http://example.test", + Key: "", + HTTPClient: http.DefaultClient, + MaxAttempts: 1, + })} _, err := s.apiCall(context.Background(), "GET", "/test", nil) if err == nil { t.Error("expected error when apiKey is empty") @@ -121,9 +133,12 @@ func TestApiCall_Bad_InvalidJson(t *testing.T) { func TestApiCall_Bad_Unreachable(t *testing.T) { s := &DirectSubsystem{ - apiURL: "http://127.0.0.1:1", // nothing listening - apiKey: "key", - client: http.DefaultClient, + apiClient: brainclient.New(brainclient.Options{ + URL: "http://127.0.0.1:1", // nothing listening + Key: "key", + HTTPClient: http.DefaultClient, + MaxAttempts: 1, + }), } _, err := s.apiCall(context.Background(), "GET", "/test", nil) if err == nil {