Compare commits

..

12 commits

Author SHA1 Message Date
Snider
91297d733d fix(mcp): rewrite mcpcmd for new core/cli Command API + correct bridge test
The mcpcmd package was using the removed Cobra-style cli.Command API
(Use/Short/Long/RunE/StringFlag/AddCommand). Rewrites it to the current
core.Command{Description, Action, Flags} path-routed pattern so the
core-mcp binary compiles again. Registers both "mcp" and "mcp/serve"
for parity with the existing OnStartup service-mode flow.

Fixes the bridge DescribableGroup test that expected len == svc.Tools()
but ToolBridge.Describe prepends the GET tool-listing entry, so the
correct expectation is len + 1.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-14 18:01:17 +01:00
Snider
ca07b6cd62 feat(mcp): RFC §3 tools + §8 discovery alignment
New tools (RFC §3):
- webview_render / webview_update: embedded UI HTML + state broadcast
  via webview.render / webview.update channels with merge-or-replace
- ws_connect / ws_send / ws_close: outbound WebSocket client tools
  with stable ws-<hex> connection IDs
- process_run: blocking command executor returning ID/exit/output
- rag_search / rag_index: aliases for rag_query / rag_ingest per spec
- rag_retrieve: fetch chunks for a source, ordered by chunk index
- ide_dashboard_state / ide_dashboard_update: merge-or-replace state
  with activity feed entries and dashboard.state.updated broadcast
- agentic_issue_dispatch: spec-aligned name for agentic_dispatch_issue

Discovery (RFC §8.2):
- transport_http.go: /.well-known/mcp-servers.json advertises both
  core-agent and core-mcp with semantic use_when hints

Tool count: 25 → 33. Good/Bad/Ugly coverage added for every new tool.

Pre-existing cmd/mcpcmd Cobra-style build error flagged but untouched
— same cmd vs core.Command migration pattern seen in cmd/api and
cmd/build (which were migrated earlier this session).

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-14 15:55:15 +01:00
Snider
92727025e7 fix(mcp): transport addr default + progress notifications + auth timing safety
- transport_http.go: addr "" now defaults to 127.0.0.1:9101 per RFC
- pkg/mcp/agentic/dispatch.go: emits NotifyProgress milestones during
  validation, workspace prep, queue/slot, spawn, start completion
- pkg/mcp/agentic/watch.go: emits NotifyProgress per watched workspace
  completion/failure with running totals
- pkg/mcp/authz.go: restore crypto/subtle for constant-time token
  comparison (timing-attack resistance)
- pkg/mcp/registry.go: related touch-up for the auth path

Spark-medium pass. Unused net/http import cleaned after verify.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-14 14:37:52 +01:00
Snider
d8144fde09 refactor: AX compliance sweep — replace banned stdlib imports with core primitives
Replaced fmt, strings, sort, os, io, sync, encoding/json, path/filepath,
errors, log, reflect with core.Sprintf, core.E, core.Contains, core.Trim,
core.Split, core.Join, core.JoinPath, slices.Sort, c.Fs(), c.Lock(),
core.JSONMarshal, core.ReadAll and other CoreGO v0.8.0 primitives.

Framework boundary exceptions preserved where stdlib types are required
by external interfaces (Gin, net/http, CGo, Wails, bubbletea).

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-13 09:32:00 +01:00
Snider
f9c5362151 feat(mcp): export NotifySession for raw JSON-RPC notifications
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-09 11:07:28 +01:00
Snider
8f3afaa42a refactor(mcp): migrate stdlib imports to core/go primitives + upgrade go-sdk v1.5.0
- Replace fmt/errors/strings/path/filepath with core.Sprintf, core.E,
  core.Contains, core.Path etc. across 16 files
- Remove 'errors' import from bridge.go (core.Is/core.As)
- Remove 'fmt' from transport_tcp.go, ide.go (core.Print, inline interface)
- Remove 'strings' from notify.go, transport_http.go, tools_webview.go,
  process_notifications.go (core.Trim, core.HasPrefix, core.Lower etc.)
- Upgrade go-sdk from v1.4.1 to v1.5.0
- Keep encoding/json for json.NewDecoder/MarshalIndent (no core equivalent)
- Keep os/exec in agentic subsystem (needs go-process Action wiring)

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-08 22:03:52 +01:00
Snider
f8f137b465 fix(mcp): disable ListChanged to prevent premature stdio notifications
The go-sdk fires notifications/tools/list_changed and
notifications/resources/list_changed with 10ms delay after AddTool/AddResource.
Since all registration happens before server.Run(), these hit stdout
before the client sends initialize, breaking the MCP handshake.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-08 20:50:46 +01:00
Snider
429f1c2b6c Revert "perf(mcp): gate extended built-in tools behind CORE_MCP_FULL"
This reverts commit 9f7dd84d4a.
2026-04-08 20:47:34 +01:00
Snider
9f7dd84d4a perf(mcp): gate extended built-in tools behind CORE_MCP_FULL
Metrics, RAG, and webview tools only register when CORE_MCP_FULL=1.
Process and WS tools always register (used by factory).
Reduces default tool count by 15.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-08 19:17:32 +01:00
Snider
9bd3084da4 fix(mcp): bridge test body + process dep resolution
- Fix TestBridgeToAPI_Good_EndToEnd: POST with empty JSON body instead of nil
- Add local replace for go-process to resolve API drift with core v0.8.0

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-08 16:39:28 +01:00
Snider
20e4a381cf fix: migrate module paths from forge.lthn.ai to dappco.re
Update all import paths and version pins:
- forge.lthn.ai/core/go-* → dappco.re/go/core/*
- forge.lthn.ai/core/api → dappco.re/go/core/api
- forge.lthn.ai/core/cli → dappco.re/go/core/cli
- Updated: api v0.3.0, cli v0.5.2, ai v0.2.2, io v0.4.1, log v0.1.2
- Updated: process v0.5.0, rag v0.1.13, ws v0.4.0, webview v0.2.1
- Updated: i18n v0.2.3, inference v0.3.0, scm v0.6.1

Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-07 12:59:22 +01:00
Snider
cd305904e5 fix: migrate module paths from forge.lthn.ai to dappco.re
Co-Authored-By: Virgil <virgil@lethean.io>
2026-04-04 16:21:14 +01:00
60 changed files with 2859 additions and 588 deletions

View file

@ -27,8 +27,8 @@ import (
"strings" "strings"
"time" "time"
coreio "forge.lthn.ai/core/go-io" coreio "dappco.re/go/core/io"
coreerr "forge.lthn.ai/core/go-log" coreerr "dappco.re/go/core/log"
) )
var ( var (

View file

@ -1,7 +1,7 @@
package main package main
import ( import (
"forge.lthn.ai/core/cli/pkg/cli" "dappco.re/go/core/cli/pkg/cli"
mcpcmd "dappco.re/go/mcp/cmd/mcpcmd" mcpcmd "dappco.re/go/mcp/cmd/mcpcmd"
) )

View file

@ -1,7 +1,14 @@
// Package mcpcmd provides the MCP server command. // SPDX-License-Identifier: EUPL-1.2
// Package mcpcmd registers the `mcp` and `mcp serve` CLI commands.
//
// Wiring example:
//
// cli.Main(cli.WithCommands("mcp", mcpcmd.AddMCPCommands))
// //
// Commands: // Commands:
// - mcp serve: Start the MCP server for AI tool integration // - mcp Start the MCP server on stdio (default transport).
// - mcp serve Start the MCP server with auto-selected transport.
package mcpcmd package mcpcmd
import ( import (
@ -10,75 +17,89 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
core "dappco.re/go/core"
"dappco.re/go/mcp/pkg/mcp" "dappco.re/go/mcp/pkg/mcp"
"dappco.re/go/mcp/pkg/mcp/agentic" "dappco.re/go/mcp/pkg/mcp/agentic"
"dappco.re/go/mcp/pkg/mcp/brain" "dappco.re/go/mcp/pkg/mcp/brain"
"forge.lthn.ai/core/cli/pkg/cli"
) )
var workspaceFlag string // newMCPService is the service constructor, indirected for tests.
var unrestrictedFlag bool
var newMCPService = mcp.New var newMCPService = mcp.New
// runMCPService starts the MCP server, indirected for tests.
var runMCPService = func(svc *mcp.Service, ctx context.Context) error { var runMCPService = func(svc *mcp.Service, ctx context.Context) error {
return svc.Run(ctx) return svc.Run(ctx)
} }
// shutdownMCPService performs graceful shutdown, indirected for tests.
var shutdownMCPService = func(svc *mcp.Service, ctx context.Context) error { var shutdownMCPService = func(svc *mcp.Service, ctx context.Context) error {
return svc.Shutdown(ctx) return svc.Shutdown(ctx)
} }
var mcpCmd = &cli.Command{ // workspaceFlag mirrors the --workspace CLI flag value.
Use: "mcp", var workspaceFlag string
Short: "MCP server for AI tool integration",
Long: "Model Context Protocol (MCP) server providing file operations, RAG, and metrics tools.", // unrestrictedFlag mirrors the --unrestricted CLI flag value.
var unrestrictedFlag bool
// AddMCPCommands registers the `mcp` command tree on the Core instance.
//
// cli.Main(cli.WithCommands("mcp", mcpcmd.AddMCPCommands))
func AddMCPCommands(c *core.Core) {
c.Command("mcp", core.Command{
Description: "Model Context Protocol server (stdio, TCP, Unix socket, HTTP).",
Action: runServeAction,
Flags: core.NewOptions(
core.Option{Key: "workspace", Value: ""},
core.Option{Key: "w", Value: ""},
core.Option{Key: "unrestricted", Value: false},
),
})
c.Command("mcp/serve", core.Command{
Description: "Start the MCP server with auto-selected transport (stdio, TCP, Unix, or HTTP).",
Action: runServeAction,
Flags: core.NewOptions(
core.Option{Key: "workspace", Value: ""},
core.Option{Key: "w", Value: ""},
core.Option{Key: "unrestricted", Value: false},
),
})
} }
var serveCmd = &cli.Command{ // runServeAction is the CLI entrypoint for `mcp` and `mcp serve`.
Use: "serve", //
Short: "Start the MCP server", // opts := core.NewOptions(core.Option{Key: "workspace", Value: "."})
Long: `Start the MCP server on stdio (default), TCP, Unix socket, or HTTP. // result := runServeAction(opts)
func runServeAction(opts core.Options) core.Result {
workspaceFlag = core.Trim(firstNonEmpty(opts.String("workspace"), opts.String("w")))
unrestrictedFlag = opts.Bool("unrestricted")
The server provides file operations plus the brain and agentic subsystems if err := runServe(); err != nil {
registered by this command. return core.Result{Value: err, OK: false}
}
Environment variables: return core.Result{OK: true}
MCP_ADDR TCP address to listen on (e.g., "localhost:9999")
MCP_UNIX_SOCKET
Unix socket path to listen on (e.g., "/tmp/core-mcp.sock")
Selected after MCP_ADDR and before stdio.
MCP_HTTP_ADDR
HTTP address to listen on (e.g., "127.0.0.1:9101")
Selected before MCP_ADDR and stdio.
Examples:
# Start with stdio transport (for Claude Code integration)
core mcp serve
# Start with workspace restriction
core mcp serve --workspace /path/to/project
# Start unrestricted (explicit opt-in)
core mcp serve --unrestricted
# Start TCP server
MCP_ADDR=localhost:9999 core mcp serve`,
RunE: func(cmd *cli.Command, args []string) error {
return runServe()
},
} }
func initFlags() { // firstNonEmpty returns the first non-empty string argument.
cli.StringFlag(serveCmd, &workspaceFlag, "workspace", "w", "", "Restrict file operations to this directory") //
cli.BoolFlag(serveCmd, &unrestrictedFlag, "unrestricted", "", false, "Disable filesystem sandboxing entirely") // firstNonEmpty("", "foo") == "foo"
} // firstNonEmpty("bar", "baz") == "bar"
func firstNonEmpty(values ...string) string {
// AddMCPCommands registers the 'mcp' command and all subcommands. for _, v := range values {
func AddMCPCommands(root *cli.Command) { if v != "" {
initFlags() return v
mcpCmd.AddCommand(serveCmd) }
root.AddCommand(mcpCmd) }
return ""
} }
// runServe wires the MCP service together and blocks until the context is
// cancelled by SIGINT/SIGTERM or a transport error.
//
// if err := runServe(); err != nil {
// core.Error("mcp serve failed", "err", err)
// }
func runServe() error { func runServe() error {
opts := mcp.Options{} opts := mcp.Options{}
@ -88,22 +109,20 @@ func runServe() error {
opts.WorkspaceRoot = workspaceFlag opts.WorkspaceRoot = workspaceFlag
} }
// Register OpenBrain and agentic subsystems // Register OpenBrain and agentic subsystems.
opts.Subsystems = []mcp.Subsystem{ opts.Subsystems = []mcp.Subsystem{
brain.NewDirect(), brain.NewDirect(),
agentic.NewPrep(), agentic.NewPrep(),
} }
// Create the MCP service
svc, err := newMCPService(opts) svc, err := newMCPService(opts)
if err != nil { if err != nil {
return cli.Wrap(err, "create MCP service") return core.E("mcpcmd.runServe", "create MCP service", err)
} }
defer func() { defer func() {
_ = shutdownMCPService(svc, context.Background()) _ = shutdownMCPService(svc, context.Background())
}() }()
// Set up signal handling for clean shutdown
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -111,10 +130,12 @@ func runServe() error {
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() { go func() {
<-sigCh select {
case <-sigCh:
cancel() cancel()
case <-ctx.Done():
}
}() }()
// Run the server (blocks until context cancelled or error)
return runMCPService(svc, ctx) return runMCPService(svc, ctx)
} }

View file

@ -1,26 +1,18 @@
// SPDX-License-Identifier: EUPL-1.2
package mcpcmd package mcpcmd
import ( import (
"context" "context"
"testing" "testing"
core "dappco.re/go/core"
"dappco.re/go/mcp/pkg/mcp" "dappco.re/go/mcp/pkg/mcp"
) )
func TestRunServe_Good_ShutsDownService(t *testing.T) { func TestCmdMCP_RunServe_Good_ShutsDownService(t *testing.T) {
oldNew := newMCPService restore := stubMCPService(t)
oldRun := runMCPService defer restore()
oldShutdown := shutdownMCPService
oldWorkspace := workspaceFlag
oldUnrestricted := unrestrictedFlag
t.Cleanup(func() {
newMCPService = oldNew
runMCPService = oldRun
shutdownMCPService = oldShutdown
workspaceFlag = oldWorkspace
unrestrictedFlag = oldUnrestricted
})
workspaceFlag = "" workspaceFlag = ""
unrestrictedFlag = false unrestrictedFlag = false
@ -50,3 +42,186 @@ func TestRunServe_Good_ShutsDownService(t *testing.T) {
t.Fatal("expected shutdownMCPService to be called") t.Fatal("expected shutdownMCPService to be called")
} }
} }
func TestCmdMCP_RunServeAction_Good_PropagatesFlags(t *testing.T) {
restore := stubMCPService(t)
defer restore()
workspaceFlag = ""
unrestrictedFlag = false
var gotOpts mcp.Options
newMCPService = func(opts mcp.Options) (*mcp.Service, error) {
gotOpts = opts
return mcp.New(mcp.Options{WorkspaceRoot: t.TempDir()})
}
runMCPService = func(svc *mcp.Service, ctx context.Context) error {
return nil
}
shutdownMCPService = func(svc *mcp.Service, ctx context.Context) error {
return nil
}
tmp := t.TempDir()
opts := core.NewOptions(core.Option{Key: "workspace", Value: tmp})
result := runServeAction(opts)
if !result.OK {
t.Fatalf("expected OK, got %+v", result)
}
if gotOpts.WorkspaceRoot != tmp {
t.Fatalf("expected workspace root %q, got %q", tmp, gotOpts.WorkspaceRoot)
}
if gotOpts.Unrestricted {
t.Fatal("expected Unrestricted=false when --workspace is set")
}
}
func TestCmdMCP_RunServeAction_Good_UnrestrictedFlag(t *testing.T) {
restore := stubMCPService(t)
defer restore()
workspaceFlag = ""
unrestrictedFlag = false
var gotOpts mcp.Options
newMCPService = func(opts mcp.Options) (*mcp.Service, error) {
gotOpts = opts
return mcp.New(mcp.Options{Unrestricted: true})
}
runMCPService = func(svc *mcp.Service, ctx context.Context) error {
return nil
}
shutdownMCPService = func(svc *mcp.Service, ctx context.Context) error {
return nil
}
opts := core.NewOptions(core.Option{Key: "unrestricted", Value: true})
result := runServeAction(opts)
if !result.OK {
t.Fatalf("expected OK, got %+v", result)
}
if !gotOpts.Unrestricted {
t.Fatal("expected Unrestricted=true when --unrestricted is set")
}
}
func TestCmdMCP_RunServe_Bad_CreateServiceFails(t *testing.T) {
restore := stubMCPService(t)
defer restore()
workspaceFlag = ""
unrestrictedFlag = false
sentinel := core.E("mcpcmd.test", "boom", nil)
newMCPService = func(opts mcp.Options) (*mcp.Service, error) {
return nil, sentinel
}
runMCPService = func(svc *mcp.Service, ctx context.Context) error {
t.Fatal("runMCPService should not be called when New fails")
return nil
}
shutdownMCPService = func(svc *mcp.Service, ctx context.Context) error {
t.Fatal("shutdownMCPService should not be called when New fails")
return nil
}
err := runServe()
if err == nil {
t.Fatal("expected error when newMCPService fails")
}
}
func TestCmdMCP_RunServeAction_Bad_PropagatesFailure(t *testing.T) {
restore := stubMCPService(t)
defer restore()
workspaceFlag = ""
unrestrictedFlag = false
newMCPService = func(opts mcp.Options) (*mcp.Service, error) {
return nil, core.E("mcpcmd.test", "construction failed", nil)
}
runMCPService = func(svc *mcp.Service, ctx context.Context) error {
return nil
}
shutdownMCPService = func(svc *mcp.Service, ctx context.Context) error {
return nil
}
result := runServeAction(core.NewOptions())
if result.OK {
t.Fatal("expected runServeAction to fail when service creation fails")
}
if result.Value == nil {
t.Fatal("expected error value on failure")
}
}
func TestCmdMCP_FirstNonEmpty_Ugly_HandlesAllVariants(t *testing.T) {
tests := []struct {
name string
values []string
want string
}{
{"no args", nil, ""},
{"empty string", []string{""}, ""},
{"all empty", []string{"", "", ""}, ""},
{"first non-empty", []string{"foo", "bar"}, "foo"},
{"skip empty", []string{"", "baz"}, "baz"},
{"mixed", []string{"", "", "last"}, "last"},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := firstNonEmpty(tc.values...)
if got != tc.want {
t.Fatalf("firstNonEmpty(%v) = %q, want %q", tc.values, got, tc.want)
}
})
}
}
func TestCmdMCP_AddMCPCommands_Good_RegistersMcpTree(t *testing.T) {
c := core.New()
AddMCPCommands(c)
commands := c.Commands()
if len(commands) == 0 {
t.Fatal("expected at least one registered command")
}
mustHave := map[string]bool{
"mcp": false,
"mcp/serve": false,
}
for _, path := range commands {
if _, ok := mustHave[path]; ok {
mustHave[path] = true
}
}
for path, present := range mustHave {
if !present {
t.Fatalf("expected command %q to be registered", path)
}
}
}
// stubMCPService captures the package-level function pointers and returns a
// restore hook so each test can mutate them without leaking into siblings.
func stubMCPService(t *testing.T) func() {
t.Helper()
oldNew := newMCPService
oldRun := runMCPService
oldShutdown := shutdownMCPService
oldWorkspace := workspaceFlag
oldUnrestricted := unrestrictedFlag
return func() {
newMCPService = oldNew
runMCPService = oldRun
shutdownMCPService = oldShutdown
workspaceFlag = oldWorkspace
unrestrictedFlag = oldUnrestricted
}
}

27
go.mod
View file

@ -4,26 +4,25 @@ go 1.26.0
require ( require (
dappco.re/go/core v0.8.0-alpha.1 dappco.re/go/core v0.8.0-alpha.1
forge.lthn.ai/core/api v0.1.5 dappco.re/go/core/ai v0.2.2
forge.lthn.ai/core/cli v0.3.7 dappco.re/go/core/api v0.3.0
forge.lthn.ai/core/go-ai v0.1.12 dappco.re/go/core/cli v0.5.2
forge.lthn.ai/core/go-io v0.1.7 dappco.re/go/core/io v0.4.1
forge.lthn.ai/core/go-log v0.0.4 dappco.re/go/core/log v0.1.2
forge.lthn.ai/core/go-process v0.2.9 dappco.re/go/core/process v0.5.0
forge.lthn.ai/core/go-rag v0.1.11 dappco.re/go/core/rag v0.1.13
forge.lthn.ai/core/go-webview v0.1.6 dappco.re/go/core/webview v0.2.1
forge.lthn.ai/core/go-ws v0.2.5 dappco.re/go/core/ws v0.4.0
github.com/gin-gonic/gin v1.12.0 github.com/gin-gonic/gin v1.12.0
github.com/gorilla/websocket v1.5.3 github.com/gorilla/websocket v1.5.3
github.com/modelcontextprotocol/go-sdk v1.4.1 github.com/modelcontextprotocol/go-sdk v1.5.0
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
) )
require ( require (
forge.lthn.ai/core/go v0.3.3 // indirect dappco.re/go/core/i18n v0.2.3 // indirect
forge.lthn.ai/core/go-i18n v0.1.7 // indirect dappco.re/go/core/inference v0.3.0 // indirect
forge.lthn.ai/core/go-inference v0.1.6 // indirect
github.com/99designs/gqlgen v0.17.88 // indirect github.com/99designs/gqlgen v0.17.88 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect github.com/KyleBanks/depth v1.2.1 // indirect
github.com/agnivade/levenshtein v1.2.1 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect
@ -149,3 +148,5 @@ require (
google.golang.org/grpc v1.79.2 // indirect google.golang.org/grpc v1.79.2 // indirect
google.golang.org/protobuf v1.36.11 // indirect google.golang.org/protobuf v1.36.11 // indirect
) )
replace dappco.re/go/core/process => ../go-process

48
go.sum
View file

@ -1,29 +1,25 @@
dappco.re/go/core v0.8.0-alpha.1 h1:gj7+Scv+L63Z7wMxbJYHhaRFkHJo2u4MMPuUSv/Dhtk= dappco.re/go/core v0.8.0-alpha.1 h1:gj7+Scv+L63Z7wMxbJYHhaRFkHJo2u4MMPuUSv/Dhtk=
dappco.re/go/core v0.8.0-alpha.1/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A= dappco.re/go/core v0.8.0-alpha.1/go.mod h1:f2/tBZ3+3IqDrg2F5F598llv0nmb/4gJVCFzM5geE4A=
forge.lthn.ai/core/api v0.1.5 h1:NwZrcOyBjaiz5/cn0n0tnlMUodi8Or6FHMx59C7Kv2o= dappco.re/go/core/ai v0.2.2 h1:fkSKm3ezAljYbghlax5qHDm11uq7LUyIedIQO1PtdcY=
forge.lthn.ai/core/api v0.1.5/go.mod h1:PBnaWyOVXSOGy+0x2XAPUFMYJxQ2CNhppia/D06ZPII= dappco.re/go/core/ai v0.2.2/go.mod h1:+MZN/EArn/W2ag91McL034WxdMSO4IPqFcQER5/POGU=
forge.lthn.ai/core/cli v0.3.7 h1:1GrbaGg0wDGHr6+klSbbGyN/9sSbHvFbdySJznymhwg= dappco.re/go/core/api v0.3.0 h1:uWYgDQ+B4e5pXPX3S5lMsqSJamfpui3LWD5hcdwvWew=
forge.lthn.ai/core/cli v0.3.7/go.mod h1:DBUppJkA9P45ZFGgI2B8VXw1rAZxamHoI/KG7fRvTNs= dappco.re/go/core/api v0.3.0/go.mod h1:1ZDNwPHV6YjkUsjtC3nfLk6U4eqWlQ6qj6yT/MB8r6k=
forge.lthn.ai/core/go v0.3.3 h1:kYYZ2nRYy0/Be3cyuLJspRjLqTMxpckVyhb/7Sw2gd0= dappco.re/go/core/cli v0.5.2 h1:mo+PERo3lUytE+r3ArHr8o2nTftXjgPPsU/rn3ETXDM=
forge.lthn.ai/core/go v0.3.3/go.mod h1:Cp4ac25pghvO2iqOu59t1GyngTKVOzKB5/VPdhRi9CQ= dappco.re/go/core/cli v0.5.2/go.mod h1:D4zfn3ec/hb72AWX/JWDvkW+h2WDKQcxGUrzoss7q2s=
forge.lthn.ai/core/go-ai v0.1.12 h1:OHt0bUABlyhvgxZxyMwueRoh8rS3YKWGFY6++zCAwC8= dappco.re/go/core/i18n v0.2.3 h1:GqFaTR1I0SfSEc4WtsAkgao+jp8X5qcMPqrX0eMAOrY=
forge.lthn.ai/core/go-ai v0.1.12/go.mod h1:5Pc9lszxgkO7Aj2Z3dtq4L9Xk9l/VNN+Baj1t///OCM= dappco.re/go/core/i18n v0.2.3/go.mod h1:LoyX/4fIEJO/wiHY3Q682+4P0Ob7zPemcATfwp0JBUg=
forge.lthn.ai/core/go-i18n v0.1.7 h1:aHkAoc3W8fw3RPNvw/UszQbjyFWXHszzbZgty3SwyAA= dappco.re/go/core/inference v0.3.0 h1:ANFnlVO1LEYDipeDeBgqmb8CHvOTUFhMPyfyHGqO0IY=
forge.lthn.ai/core/go-i18n v0.1.7/go.mod h1:0VDjwtY99NSj2iqwrI09h5GUsJeM9s48MLkr+/Dn4G8= dappco.re/go/core/inference v0.3.0/go.mod h1:wbRY0v6iwOoJCpTvcBFarAM08bMgpPcrF6yv3vccYoA=
forge.lthn.ai/core/go-inference v0.1.6 h1:ce42zC0zO8PuISUyAukAN1NACEdWp5wF1mRgnh5+58E= dappco.re/go/core/io v0.4.1 h1:15dm7ldhFIAuZOrBiQG6XVZDpSvCxtZsUXApwTAB3wQ=
forge.lthn.ai/core/go-inference v0.1.6/go.mod h1:jfWz+IJX55wAH98+ic6FEqqGB6/P31CHlg7VY7pxREw= dappco.re/go/core/io v0.4.1/go.mod h1:w71dukyunczLb8frT9JOd5B78PjwWQD3YAXiCt3AcPA=
forge.lthn.ai/core/go-io v0.1.7 h1:Tdb6sqh+zz1lsGJaNX9RFWM6MJ/RhSAyxfulLXrJsbk= dappco.re/go/core/log v0.1.2 h1:pQSZxKD8VycdvjNJmatXbPSq2OxcP2xHbF20zgFIiZI=
forge.lthn.ai/core/go-io v0.1.7/go.mod h1:8lRLFk4Dnp5cR/Cyzh9WclD5566TbpdRgwcH7UZLWn4= dappco.re/go/core/log v0.1.2/go.mod h1:Nkqb8gsXhZAO8VLpx7B8i1iAmohhzqA20b9Zr8VUcJs=
forge.lthn.ai/core/go-log v0.0.4 h1:KTuCEPgFmuM8KJfnyQ8vPOU1Jg654W74h8IJvfQMfv0= dappco.re/go/core/rag v0.1.13 h1:R2Q+Xw5YenT4uFemXLBu+xQYtyUIYGSmMln5/Z+nol4=
forge.lthn.ai/core/go-log v0.0.4/go.mod h1:r14MXKOD3LF/sI8XUJQhRk/SZHBE7jAFVuCfgkXoZPw= dappco.re/go/core/rag v0.1.13/go.mod h1:wthXtCqYEChjlGIHcJXetlgk49lPDmzG6jFWd1PEIZc=
forge.lthn.ai/core/go-process v0.2.9 h1:Wql+5TUF+lfU2oJ9I+S764MkTqJhBsuyMM0v1zsfZC4= dappco.re/go/core/webview v0.2.1 h1:rdy2sV+MS6RZsav8BiARJxtWhfx7eOAJp3b1Ynp1sYs=
forge.lthn.ai/core/go-process v0.2.9/go.mod h1:NIzZOF5IVYYCjHkcNIGcg1mZH+bzGoie4SlZUDYOKIM= dappco.re/go/core/webview v0.2.1/go.mod h1:Qdo1V/sJJwOnL0hYd3+vzVUJxWYC8eGyILZROya6KoM=
forge.lthn.ai/core/go-rag v0.1.11 h1:KXTOtnOdrx8YKmvnj0EOi2EI/+cKjE8w2PpJCQIrSd8= dappco.re/go/core/ws v0.4.0 h1:yEDV9whXyo+GWzBSjuB3NiLiH2bmBPBWD6rydwHyBn8=
forge.lthn.ai/core/go-rag v0.1.11/go.mod h1:vIlOKVD1SdqqjkJ2XQyXPuKPtiajz/STPLCaDpqOzk8= dappco.re/go/core/ws v0.4.0/go.mod h1:L1rrgW6zU+DztcVBJW2yO5Lm3rGXpyUMOA8OL9zsAok=
forge.lthn.ai/core/go-webview v0.1.6 h1:szXQxRJf2bOZJKh3v1P01B1Vf9mgXaBCXzh0EZu9aoc=
forge.lthn.ai/core/go-webview v0.1.6/go.mod h1:5n1tECD1wBV/uFZRY9ZjfPFO5TYZrlaR3mQFwvO2nek=
forge.lthn.ai/core/go-ws v0.2.5 h1:ZIV7Yrv01R/xpJUogA5vrfP9yB9li1w7EV3eZFMt8h0=
forge.lthn.ai/core/go-ws v0.2.5/go.mod h1:C3riJyLLcV6QhLvYlq3P/XkGTsN598qQeGBoLdoHBU4=
github.com/99designs/gqlgen v0.17.88 h1:neMQDgehMwT1vYIOx/w5ZYPUU/iMNAJzRO44I5Intoc= github.com/99designs/gqlgen v0.17.88 h1:neMQDgehMwT1vYIOx/w5ZYPUU/iMNAJzRO44I5Intoc=
github.com/99designs/gqlgen v0.17.88/go.mod h1:qeqYFEgOeSKqWedOjogPizimp2iu4E23bdPvl4jTYic= github.com/99designs/gqlgen v0.17.88/go.mod h1:qeqYFEgOeSKqWedOjogPizimp2iu4E23bdPvl4jTYic=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
@ -222,8 +218,8 @@ github.com/mattn/go-localereader v0.0.1 h1:ygSAOl7ZXTx4RdPYinUpg6W99U8jWvWi9Ye2J
github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88= github.com/mattn/go-localereader v0.0.1/go.mod h1:8fBrzywKY7BI3czFoHkuzRoWE9C+EiG4R1k4Cjx5p88=
github.com/mattn/go-runewidth v0.0.21 h1:jJKAZiQH+2mIinzCJIaIG9Be1+0NR+5sz/lYEEjdM8w= github.com/mattn/go-runewidth v0.0.21 h1:jJKAZiQH+2mIinzCJIaIG9Be1+0NR+5sz/lYEEjdM8w=
github.com/mattn/go-runewidth v0.0.21/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= github.com/mattn/go-runewidth v0.0.21/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs=
github.com/modelcontextprotocol/go-sdk v1.4.1 h1:M4x9GyIPj+HoIlHNGpK2hq5o3BFhC+78PkEaldQRphc= github.com/modelcontextprotocol/go-sdk v1.5.0 h1:CHU0FIX9kpueNkxuYtfYQn1Z0slhFzBZuq+x6IiblIU=
github.com/modelcontextprotocol/go-sdk v1.4.1/go.mod h1:Bo/mS87hPQqHSRkMv4dQq1XCu6zv4INdXnFZabkNU6s= github.com/modelcontextprotocol/go-sdk v1.5.0/go.mod h1:gggDIhoemhWs3BGkGwd1umzEXCEMMvAnhTrnbXJKKKA=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=

View file

@ -4,17 +4,15 @@ package agentic
import ( import (
"context" "context"
"fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"strings"
"syscall" "syscall"
"time" "time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -54,7 +52,7 @@ func (s *PrepSubsystem) registerDispatchTool(svc *coremcp.Service) {
// agentCommand returns the command and args for a given agent type. // agentCommand returns the command and args for a given agent type.
// Supports model variants: "gemini", "gemini:flash", "gemini:pro", "claude", "claude:haiku". // Supports model variants: "gemini", "gemini:flash", "gemini:pro", "claude", "claude:haiku".
func agentCommand(agent, prompt string) (string, []string, error) { func agentCommand(agent, prompt string) (string, []string, error) {
parts := strings.SplitN(agent, ":", 2) parts := core.SplitN(agent, ":", 2)
base := parts[0] base := parts[0]
model := "" model := ""
if len(parts) > 1 { if len(parts) > 1 {
@ -78,7 +76,7 @@ func agentCommand(agent, prompt string) (string, []string, error) {
return "claude", args, nil return "claude", args, nil
case "local": case "local":
home, _ := os.UserHomeDir() home, _ := os.UserHomeDir()
script := filepath.Join(home, "Code", "core", "agent", "scripts", "local-agent.sh") script := core.Path(home, "Code", "core", "agent", "scripts", "local-agent.sh")
return "bash", []string{script, prompt}, nil return "bash", []string{script, prompt}, nil
default: default:
return "", nil, coreerr.E("agentCommand", "unknown agent: "+agent, nil) return "", nil, coreerr.E("agentCommand", "unknown agent: "+agent, nil)
@ -86,6 +84,25 @@ func agentCommand(agent, prompt string) (string, []string, error) {
} }
func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, input DispatchInput) (*mcp.CallToolResult, DispatchOutput, error) { func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest, input DispatchInput) (*mcp.CallToolResult, DispatchOutput, error) {
progressToken := any(nil)
if req != nil && req.Params != nil {
progressToken = req.Params.GetProgressToken()
}
sendProgress := func(progress float64, total float64, message string) {
if req == nil || req.Session == nil || progressToken == nil {
return
}
_ = req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: progress,
Total: total,
Message: message,
})
}
const dispatchProgressTotal = 4
if input.Repo == "" { if input.Repo == "" {
return nil, DispatchOutput{}, coreerr.E("dispatch", "repo is required", nil) return nil, DispatchOutput{}, coreerr.E("dispatch", "repo is required", nil)
} }
@ -102,7 +119,10 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
input.Template = "coding" input.Template = "coding"
} }
sendProgress(1, dispatchProgressTotal, "validated dispatch request")
// Step 1: Prep the sandboxed workspace // Step 1: Prep the sandboxed workspace
sendProgress(2, dispatchProgressTotal, "preparing workspace")
prepInput := PrepInput{ prepInput := PrepInput{
Repo: input.Repo, Repo: input.Repo,
Org: input.Org, Org: input.Org,
@ -117,16 +137,18 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
if err != nil { if err != nil {
return nil, DispatchOutput{}, coreerr.E("dispatch", "prep workspace failed", err) return nil, DispatchOutput{}, coreerr.E("dispatch", "prep workspace failed", err)
} }
sendProgress(3, dispatchProgressTotal, core.Sprintf("workspace prepared for %s", prepOut.Branch))
wsDir := prepOut.WorkspaceDir wsDir := prepOut.WorkspaceDir
srcDir := filepath.Join(wsDir, "src") srcDir := core.Path(wsDir, "src")
// The prompt is just: read PROMPT.md and do the work // The prompt is just: read PROMPT.md and do the work
prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory." prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory."
if input.DryRun { if input.DryRun {
// Read PROMPT.md for the dry run output // Read PROMPT.md for the dry run output
promptRaw, _ := coreio.Local.Read(filepath.Join(wsDir, "PROMPT.md")) promptRaw, _ := coreio.Local.Read(core.Path(wsDir, "PROMPT.md"))
sendProgress(dispatchProgressTotal, dispatchProgressTotal, "dry run complete")
return nil, DispatchOutput{ return nil, DispatchOutput{
Success: true, Success: true,
Agent: input.Agent, Agent: input.Agent,
@ -150,6 +172,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
StartedAt: time.Now(), StartedAt: time.Now(),
Runs: 0, Runs: 0,
}) })
sendProgress(dispatchProgressTotal, dispatchProgressTotal, "queued until an agent slot is available")
return nil, DispatchOutput{ return nil, DispatchOutput{
Success: true, Success: true,
Agent: input.Agent, Agent: input.Agent,
@ -172,8 +195,10 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
StartedAt: time.Now(), StartedAt: time.Now(),
Runs: 1, Runs: 1,
}) })
sendProgress(3.5, dispatchProgressTotal, "dispatch slot acquired")
// Step 4: Spawn agent as a detached process // Step 4: Spawn agent as a detached process
sendProgress(4, dispatchProgressTotal, core.Sprintf("spawning agent %s", input.Agent))
// Uses Setpgid so the agent survives parent (MCP server) death. // Uses Setpgid so the agent survives parent (MCP server) death.
// Output goes directly to log file (not buffered in memory). // Output goes directly to log file (not buffered in memory).
command, args, err := agentCommand(input.Agent, prompt) command, args, err := agentCommand(input.Agent, prompt)
@ -181,7 +206,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
return nil, DispatchOutput{}, err return nil, DispatchOutput{}, err
} }
outputFile := filepath.Join(wsDir, fmt.Sprintf("agent-%s.log", input.Agent)) outputFile := core.Path(wsDir, core.Sprintf("agent-%s.log", input.Agent))
outFile, err := os.Create(outputFile) outFile, err := os.Create(outputFile)
if err != nil { if err != nil {
return nil, DispatchOutput{}, coreerr.E("dispatch", "failed to create log file", err) return nil, DispatchOutput{}, coreerr.E("dispatch", "failed to create log file", err)
@ -222,6 +247,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
} }
pid := cmd.Process.Pid pid := cmd.Process.Pid
sendProgress(dispatchProgressTotal, dispatchProgressTotal, "agent process started")
// Update status with PID now that agent is running // Update status with PID now that agent is running
s.saveStatus(wsDir, &WorkspaceStatus{ s.saveStatus(wsDir, &WorkspaceStatus{
@ -247,7 +273,7 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
status := "completed" status := "completed"
channel := coremcp.ChannelAgentComplete channel := coremcp.ChannelAgentComplete
payload := map[string]any{ payload := map[string]any{
"workspace": filepath.Base(wsDir), "workspace": core.PathBase(wsDir),
"repo": input.Repo, "repo": input.Repo,
"org": input.Org, "org": input.Org,
"agent": input.Agent, "agent": input.Agent,
@ -257,11 +283,11 @@ func (s *PrepSubsystem) dispatch(ctx context.Context, req *mcp.CallToolRequest,
// Update status to completed or blocked. // Update status to completed or blocked.
if st, err := readStatus(wsDir); err == nil { if st, err := readStatus(wsDir); err == nil {
st.PID = 0 st.PID = 0
if data, err := coreio.Local.Read(filepath.Join(wsDir, "src", "BLOCKED.md")); err == nil { if data, err := coreio.Local.Read(core.Path(wsDir, "src", "BLOCKED.md")); err == nil {
status = "blocked" status = "blocked"
channel = coremcp.ChannelAgentBlocked channel = coremcp.ChannelAgentBlocked
st.Status = status st.Status = status
st.Question = strings.TrimSpace(data) st.Question = core.Trim(data)
if st.Question != "" { if st.Question != "" {
payload["question"] = st.Question payload["question"] = st.Question
} }

View file

@ -6,12 +6,11 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"strings"
core "dappco.re/go/core"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -101,14 +100,14 @@ func (s *PrepSubsystem) createEpic(ctx context.Context, req *mcp.CallToolRequest
} }
// Step 2: Build epic body with checklist // Step 2: Build epic body with checklist
var body strings.Builder body := core.NewBuilder()
if input.Body != "" { if input.Body != "" {
body.WriteString(input.Body) body.WriteString(input.Body)
body.WriteString("\n\n") body.WriteString("\n\n")
} }
body.WriteString("## Tasks\n\n") body.WriteString("## Tasks\n\n")
for _, child := range children { for _, child := range children {
body.WriteString(fmt.Sprintf("- [ ] #%d %s\n", child.Number, child.Title)) body.WriteString(core.Sprintf("- [ ] #%d %s\n", child.Number, child.Title))
} }
// Step 3: Create epic issue // Step 3: Create epic issue
@ -157,8 +156,12 @@ func (s *PrepSubsystem) createIssue(ctx context.Context, org, repo, title, body
payload["labels"] = labelIDs payload["labels"] = labelIDs
} }
data, _ := json.Marshal(payload) r := core.JSONMarshal(payload)
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues", s.forgeURL, org, repo) if !r.OK {
return ChildRef{}, coreerr.E("createIssue", "failed to encode issue payload", nil)
}
data := r.Value.([]byte)
url := core.Sprintf("%s/api/v1/repos/%s/%s/issues", s.forgeURL, org, repo)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(data)) req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(data))
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "token "+s.forgeToken) req.Header.Set("Authorization", "token "+s.forgeToken)
@ -170,7 +173,7 @@ func (s *PrepSubsystem) createIssue(ctx context.Context, org, repo, title, body
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 201 { if resp.StatusCode != 201 {
return ChildRef{}, coreerr.E("createIssue", fmt.Sprintf("returned %d", resp.StatusCode), nil) return ChildRef{}, coreerr.E("createIssue", core.Sprintf("returned %d", resp.StatusCode), nil)
} }
var result struct { var result struct {
@ -193,7 +196,7 @@ func (s *PrepSubsystem) resolveLabelIDs(ctx context.Context, org, repo string, n
} }
// Fetch existing labels // Fetch existing labels
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/labels?limit=50", s.forgeURL, org, repo) url := core.Sprintf("%s/api/v1/repos/%s/%s/labels?limit=50", s.forgeURL, org, repo)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "token "+s.forgeToken) req.Header.Set("Authorization", "token "+s.forgeToken)
@ -246,12 +249,16 @@ func (s *PrepSubsystem) createLabel(ctx context.Context, org, repo, name string)
colour = "#6b7280" colour = "#6b7280"
} }
payload, _ := json.Marshal(map[string]string{ r := core.JSONMarshal(map[string]string{
"name": name, "name": name,
"color": colour, "color": colour,
}) })
if !r.OK {
return 0
}
payload := r.Value.([]byte)
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/labels", s.forgeURL, org, repo) url := core.Sprintf("%s/api/v1/repos/%s/%s/labels", s.forgeURL, org, repo)
req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "token "+s.forgeToken) req.Header.Set("Authorization", "token "+s.forgeToken)

View file

@ -3,17 +3,12 @@
package agentic package agentic
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt"
"net/http" "net/http"
"os"
"path/filepath"
"strings"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
) )
// ingestFindings reads the agent output log and creates issues via the API // ingestFindings reads the agent output log and creates issues via the API
@ -25,10 +20,7 @@ func (s *PrepSubsystem) ingestFindings(wsDir string) {
} }
// Read the log file // Read the log file
logFiles, err := filepath.Glob(filepath.Join(wsDir, "agent-*.log")) logFiles := core.PathGlob(core.Path(wsDir, "agent-*.log"))
if err != nil {
return
}
if len(logFiles) == 0 { if len(logFiles) == 0 {
return return
} }
@ -41,7 +33,7 @@ func (s *PrepSubsystem) ingestFindings(wsDir string) {
body := contentStr body := contentStr
// Skip quota errors // Skip quota errors
if strings.Contains(body, "QUOTA_EXHAUSTED") || strings.Contains(body, "QuotaError") { if core.Contains(body, "QUOTA_EXHAUSTED") || core.Contains(body, "QuotaError") {
return return
} }
@ -56,13 +48,13 @@ func (s *PrepSubsystem) ingestFindings(wsDir string) {
// Determine issue type from the template used // Determine issue type from the template used
issueType := "task" issueType := "task"
priority := "normal" priority := "normal"
if strings.Contains(body, "security") || strings.Contains(body, "Security") { if core.Contains(body, "security") || core.Contains(body, "Security") {
issueType = "bug" issueType = "bug"
priority = "high" priority = "high"
} }
// Create a single issue per repo with all findings in the body // Create a single issue per repo with all findings in the body
title := fmt.Sprintf("Scan findings for %s (%d items)", st.Repo, findings) title := core.Sprintf("Scan findings for %s (%d items)", st.Repo, findings)
// Truncate body to reasonable size for issue description // Truncate body to reasonable size for issue description
description := body description := body
@ -86,7 +78,7 @@ func countFileRefs(body string) int {
} }
if j < len(body) && body[j] == '`' { if j < len(body) && body[j] == '`' {
ref := body[i+1 : j] ref := body[i+1 : j]
if strings.Contains(ref, ".go:") || strings.Contains(ref, ".php:") { if core.Contains(ref, ".go:") || core.Contains(ref, ".php:") {
count++ count++
} }
} }
@ -102,25 +94,22 @@ func (s *PrepSubsystem) createIssueViaAPI(repo, title, description, issueType, p
} }
// Read the agent API key from file // Read the agent API key from file
home, _ := os.UserHomeDir() home := core.Env("HOME")
apiKeyData, err := coreio.Local.Read(filepath.Join(home, ".claude", "agent-api.key")) apiKeyData, err := coreio.Local.Read(core.Path(home, ".claude", "agent-api.key"))
if err != nil { if err != nil {
return false return false
} }
apiKey := strings.TrimSpace(apiKeyData) apiKey := core.Trim(apiKeyData)
payload, err := json.Marshal(map[string]string{ payloadStr := core.JSONMarshalString(map[string]string{
"title": title, "title": title,
"description": description, "description": description,
"type": issueType, "type": issueType,
"priority": priority, "priority": priority,
"reporter": "cladius", "reporter": "cladius",
}) })
if err != nil {
return false
}
req, err := http.NewRequest("POST", s.brainURL+"/v1/issues", bytes.NewReader(payload)) req, err := http.NewRequest("POST", s.brainURL+"/v1/issues", core.NewReader(payloadStr))
if err != nil { if err != nil {
return false return false
} }

View file

@ -6,11 +6,11 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
core "dappco.re/go/core"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -49,6 +49,12 @@ func (s *PrepSubsystem) registerIssueTools(svc *coremcp.Service) {
Description: "Dispatch an agent to work on a Forge issue. Assigns the issue as a lock, prepends the issue body to TODO.md, creates an issue-specific branch, and spawns the agent.", Description: "Dispatch an agent to work on a Forge issue. Assigns the issue as a lock, prepends the issue body to TODO.md, creates an issue-specific branch, and spawns the agent.",
}, s.dispatchIssue) }, s.dispatchIssue)
// agentic_issue_dispatch is the spec-aligned name for the same action.
coremcp.AddToolRecorded(svc, server, "agentic", &mcp.Tool{
Name: "agentic_issue_dispatch",
Description: "Dispatch an agent to work on a Forge issue. Spec-aligned alias for agentic_dispatch_issue.",
}, s.dispatchIssue)
coremcp.AddToolRecorded(svc, server, "agentic", &mcp.Tool{ coremcp.AddToolRecorded(svc, server, "agentic", &mcp.Tool{
Name: "agentic_pr", Name: "agentic_pr",
Description: "Create a pull request from an agent workspace. Pushes the branch and creates a Forge PR linked to the tracked issue, if any.", Description: "Create a pull request from an agent workspace. Pushes the branch and creates a Forge PR linked to the tracked issue, if any.",
@ -77,10 +83,10 @@ func (s *PrepSubsystem) dispatchIssue(ctx context.Context, req *mcp.CallToolRequ
return nil, DispatchOutput{}, err return nil, DispatchOutput{}, err
} }
if issue.State != "open" { if issue.State != "open" {
return nil, DispatchOutput{}, coreerr.E("dispatchIssue", fmt.Sprintf("issue %d is %s, not open", input.Issue, issue.State), nil) return nil, DispatchOutput{}, coreerr.E("dispatchIssue", core.Sprintf("issue %d is %s, not open", input.Issue, issue.State), nil)
} }
if issue.Assignee != nil && issue.Assignee.Login != "" { if issue.Assignee != nil && issue.Assignee.Login != "" {
return nil, DispatchOutput{}, coreerr.E("dispatchIssue", fmt.Sprintf("issue %d is already assigned to %s", input.Issue, issue.Assignee.Login), nil) return nil, DispatchOutput{}, coreerr.E("dispatchIssue", core.Sprintf("issue %d is already assigned to %s", input.Issue, issue.Assignee.Login), nil)
} }
if !input.DryRun { if !input.DryRun {
@ -124,7 +130,7 @@ func (s *PrepSubsystem) dispatchIssue(ctx context.Context, req *mcp.CallToolRequ
func (s *PrepSubsystem) unlockIssue(ctx context.Context, org, repo string, issue int, labels []struct { func (s *PrepSubsystem) unlockIssue(ctx context.Context, org, repo string, issue int, labels []struct {
Name string `json:"name"` Name string `json:"name"`
}) error { }) error {
updateURL := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue) updateURL := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue)
issueLabels := make([]string, 0, len(labels)) issueLabels := make([]string, 0, len(labels))
for _, label := range labels { for _, label := range labels {
if label.Name == "in-progress" { if label.Name == "in-progress" {
@ -135,13 +141,14 @@ func (s *PrepSubsystem) unlockIssue(ctx context.Context, org, repo string, issue
if issueLabels == nil { if issueLabels == nil {
issueLabels = []string{} issueLabels = []string{}
} }
payload, err := json.Marshal(map[string]any{ r := core.JSONMarshal(map[string]any{
"assignees": []string{}, "assignees": []string{},
"labels": issueLabels, "labels": issueLabels,
}) })
if err != nil { if !r.OK {
return coreerr.E("unlockIssue", "failed to encode issue unlock", err) return coreerr.E("unlockIssue", "failed to encode issue unlock", nil)
} }
payload := r.Value.([]byte)
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, updateURL, bytes.NewReader(payload)) req, err := http.NewRequestWithContext(ctx, http.MethodPatch, updateURL, bytes.NewReader(payload))
if err != nil { if err != nil {
@ -156,14 +163,14 @@ func (s *PrepSubsystem) unlockIssue(ctx context.Context, org, repo string, issue
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest { if resp.StatusCode >= http.StatusBadRequest {
return coreerr.E("unlockIssue", fmt.Sprintf("issue unlock returned %d", resp.StatusCode), nil) return coreerr.E("unlockIssue", core.Sprintf("issue unlock returned %d", resp.StatusCode), nil)
} }
return nil return nil
} }
func (s *PrepSubsystem) fetchIssue(ctx context.Context, org, repo string, issue int) (*forgeIssue, error) { func (s *PrepSubsystem) fetchIssue(ctx context.Context, org, repo string, issue int) (*forgeIssue, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue) url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
return nil, coreerr.E("fetchIssue", "failed to build request", err) return nil, coreerr.E("fetchIssue", "failed to build request", err)
@ -176,7 +183,7 @@ func (s *PrepSubsystem) fetchIssue(ctx context.Context, org, repo string, issue
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return nil, coreerr.E("fetchIssue", fmt.Sprintf("issue %d not found in %s/%s", issue, org, repo), nil) return nil, coreerr.E("fetchIssue", core.Sprintf("issue %d not found in %s/%s", issue, org, repo), nil)
} }
var out forgeIssue var out forgeIssue
@ -187,14 +194,15 @@ func (s *PrepSubsystem) fetchIssue(ctx context.Context, org, repo string, issue
} }
func (s *PrepSubsystem) lockIssue(ctx context.Context, org, repo string, issue int, assignee string) error { func (s *PrepSubsystem) lockIssue(ctx context.Context, org, repo string, issue int, assignee string) error {
updateURL := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue) updateURL := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue)
payload, err := json.Marshal(map[string]any{ r := core.JSONMarshal(map[string]any{
"assignees": []string{assignee}, "assignees": []string{assignee},
"labels": []string{"in-progress"}, "labels": []string{"in-progress"},
}) })
if err != nil { if !r.OK {
return coreerr.E("lockIssue", "failed to encode issue update", err) return coreerr.E("lockIssue", "failed to encode issue update", nil)
} }
payload := r.Value.([]byte)
req, err := http.NewRequestWithContext(ctx, http.MethodPatch, updateURL, bytes.NewReader(payload)) req, err := http.NewRequestWithContext(ctx, http.MethodPatch, updateURL, bytes.NewReader(payload))
if err != nil { if err != nil {
@ -209,7 +217,7 @@ func (s *PrepSubsystem) lockIssue(ctx context.Context, org, repo string, issue i
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode >= http.StatusBadRequest { if resp.StatusCode >= http.StatusBadRequest {
return coreerr.E("lockIssue", fmt.Sprintf("issue update returned %d", resp.StatusCode), nil) return coreerr.E("lockIssue", core.Sprintf("issue update returned %d", resp.StatusCode), nil)
} }
return nil return nil

View file

@ -4,12 +4,11 @@ package agentic
import ( import (
"context" "context"
"fmt"
"os/exec" "os/exec"
"path/filepath"
core "dappco.re/go/core"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -64,7 +63,7 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu
skipped := make([]string, 0) skipped := make([]string, 0)
for _, repo := range repos { for _, repo := range repos {
repoDir := filepath.Join(basePath, repo) repoDir := core.Path(basePath, repo)
if !hasRemote(repoDir, "github") { if !hasRemote(repoDir, "github") {
skipped = append(skipped, repo+": no github remote") skipped = append(skipped, repo+": no github remote")
continue continue
@ -88,7 +87,7 @@ func (s *PrepSubsystem) mirror(ctx context.Context, _ *mcp.CallToolRequest, inpu
} }
if files > maxFiles { if files > maxFiles {
sync.Skipped = fmt.Sprintf("%d files exceeds limit of %d", files, maxFiles) sync.Skipped = core.Sprintf("%d files exceeds limit of %d", files, maxFiles)
synced = append(synced, sync) synced = append(synced, sync)
continue continue
} }

View file

@ -7,13 +7,12 @@ import (
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"path/filepath"
"strings"
"time" "time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -349,11 +348,11 @@ func (s *PrepSubsystem) planList(_ context.Context, _ *mcp.CallToolRequest, inpu
var plans []Plan var plans []Plan
for _, entry := range entries { for _, entry := range entries {
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { if entry.IsDir() || !core.HasSuffix(entry.Name(), ".json") {
continue continue
} }
id := strings.TrimSuffix(entry.Name(), ".json") id := core.TrimSuffix(entry.Name(), ".json")
plan, err := readPlan(dir, id) plan, err := readPlan(dir, id)
if err != nil { if err != nil {
continue continue
@ -422,41 +421,41 @@ func (s *PrepSubsystem) planCheckpoint(_ context.Context, _ *mcp.CallToolRequest
// --- Helpers --- // --- Helpers ---
func (s *PrepSubsystem) plansDir() string { func (s *PrepSubsystem) plansDir() string {
return filepath.Join(s.codePath, ".core", "plans") return core.Path(s.codePath, ".core", "plans")
} }
func planPath(dir, id string) string { func planPath(dir, id string) string {
return filepath.Join(dir, id+".json") return core.Path(dir, id+".json")
} }
func generatePlanID(title string) string { func generatePlanID(title string) string {
slug := strings.Map(func(r rune) rune { b := core.NewBuilder()
if r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '-' { b.Grow(len(title))
return r for _, r := range title {
switch {
case r >= 'a' && r <= 'z', r >= '0' && r <= '9', r == '-':
b.WriteRune(r)
case r >= 'A' && r <= 'Z':
b.WriteRune(r + 32)
case r == ' ':
b.WriteByte('-')
} }
if r >= 'A' && r <= 'Z' {
return r + 32
} }
if r == ' ' { slug := b.String()
return '-'
}
return -1
}, title)
// Trim consecutive dashes and cap length // Collapse consecutive dashes and cap length
for strings.Contains(slug, "--") { for core.Contains(slug, "--") {
slug = strings.ReplaceAll(slug, "--", "-") slug = core.Replace(slug, "--", "-")
} }
slug = strings.Trim(slug, "-") slug = trimDashes(slug)
if len(slug) > 30 { if len(slug) > 30 {
slug = slug[:30] slug = trimDashes(slug[:30])
} }
slug = strings.TrimRight(slug, "-")
// Append short random suffix for uniqueness // Append short random suffix for uniqueness
b := make([]byte, 3) rnd := make([]byte, 3)
rand.Read(b) rand.Read(rnd)
return slug + "-" + hex.EncodeToString(b) return slug + "-" + hex.EncodeToString(rnd)
} }
func readPlan(dir, id string) (*Plan, error) { func readPlan(dir, id string) (*Plan, error) {
@ -466,8 +465,8 @@ func readPlan(dir, id string) (*Plan, error) {
} }
var plan Plan var plan Plan
if err := json.Unmarshal([]byte(data), &plan); err != nil { if r := core.JSONUnmarshal([]byte(data), &plan); !r.OK {
return nil, coreerr.E("readPlan", "failed to parse plan "+id, err) return nil, coreerr.E("readPlan", "failed to parse plan "+id, nil)
} }
return &plan, nil return &plan, nil
} }

View file

@ -6,15 +6,13 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"os/exec" "os/exec"
"path/filepath"
"strings"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -66,8 +64,8 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
return nil, CreatePROutput{}, coreerr.E("createPR", "no Forge token configured", nil) return nil, CreatePROutput{}, coreerr.E("createPR", "no Forge token configured", nil)
} }
wsDir := filepath.Join(s.workspaceRoot(), input.Workspace) wsDir := core.Path(s.workspaceRoot(), input.Workspace)
srcDir := filepath.Join(wsDir, "src") srcDir := core.Path(wsDir, "src")
if _, err := coreio.Local.List(srcDir); err != nil { if _, err := coreio.Local.List(srcDir); err != nil {
return nil, CreatePROutput{}, coreerr.E("createPR", "workspace not found: "+input.Workspace, nil) return nil, CreatePROutput{}, coreerr.E("createPR", "workspace not found: "+input.Workspace, nil)
@ -87,7 +85,7 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
if err != nil { if err != nil {
return nil, CreatePROutput{}, coreerr.E("createPR", "failed to detect branch", err) return nil, CreatePROutput{}, coreerr.E("createPR", "failed to detect branch", err)
} }
st.Branch = strings.TrimSpace(string(out)) st.Branch = core.Trim(string(out))
} }
org := st.Org org := st.Org
@ -105,7 +103,7 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
title = st.Task title = st.Task
} }
if title == "" { if title == "" {
title = fmt.Sprintf("Agent work on %s", st.Branch) title = core.Sprintf("Agent work on %s", st.Branch)
} }
// Build PR body // Build PR body
@ -143,7 +141,7 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
// Comment on issue if tracked // Comment on issue if tracked
if st.Issue > 0 { if st.Issue > 0 {
comment := fmt.Sprintf("Pull request created: %s", prURL) comment := core.Sprintf("Pull request created: %s", prURL)
s.commentOnIssue(ctx, org, st.Repo, st.Issue, comment) s.commentOnIssue(ctx, org, st.Repo, st.Issue, comment)
} }
@ -159,17 +157,17 @@ func (s *PrepSubsystem) createPR(ctx context.Context, _ *mcp.CallToolRequest, in
} }
func (s *PrepSubsystem) buildPRBody(st *WorkspaceStatus) string { func (s *PrepSubsystem) buildPRBody(st *WorkspaceStatus) string {
var b strings.Builder b := core.NewBuilder()
b.WriteString("## Summary\n\n") b.WriteString("## Summary\n\n")
if st.Task != "" { if st.Task != "" {
b.WriteString(st.Task) b.WriteString(st.Task)
b.WriteString("\n\n") b.WriteString("\n\n")
} }
if st.Issue > 0 { if st.Issue > 0 {
b.WriteString(fmt.Sprintf("Closes #%d\n\n", st.Issue)) b.WriteString(core.Sprintf("Closes #%d\n\n", st.Issue))
} }
b.WriteString(fmt.Sprintf("**Agent:** %s\n", st.Agent)) b.WriteString(core.Sprintf("**Agent:** %s\n", st.Agent))
b.WriteString(fmt.Sprintf("**Runs:** %d\n", st.Runs)) b.WriteString(core.Sprintf("**Runs:** %d\n", st.Runs))
b.WriteString("\n---\n*Created by agentic dispatch*\n") b.WriteString("\n---\n*Created by agentic dispatch*\n")
return b.String() return b.String()
} }
@ -185,7 +183,7 @@ func (s *PrepSubsystem) forgeCreatePR(ctx context.Context, org, repo, head, base
return "", 0, coreerr.E("forgeCreatePR", "failed to marshal PR payload", err) return "", 0, coreerr.E("forgeCreatePR", "failed to marshal PR payload", err)
} }
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/pulls", s.forgeURL, org, repo) url := core.Sprintf("%s/api/v1/repos/%s/%s/pulls", s.forgeURL, org, repo)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
if err != nil { if err != nil {
return "", 0, coreerr.E("forgeCreatePR", "failed to build PR request", err) return "", 0, coreerr.E("forgeCreatePR", "failed to build PR request", err)
@ -202,10 +200,10 @@ func (s *PrepSubsystem) forgeCreatePR(ctx context.Context, org, repo, head, base
if resp.StatusCode != 201 { if resp.StatusCode != 201 {
var errBody map[string]any var errBody map[string]any
if err := json.NewDecoder(resp.Body).Decode(&errBody); err != nil { if err := json.NewDecoder(resp.Body).Decode(&errBody); err != nil {
return "", 0, coreerr.E("forgeCreatePR", fmt.Sprintf("HTTP %d with unreadable error body", resp.StatusCode), err) return "", 0, coreerr.E("forgeCreatePR", core.Sprintf("HTTP %d with unreadable error body", resp.StatusCode), err)
} }
msg, _ := errBody["message"].(string) msg, _ := errBody["message"].(string)
return "", 0, coreerr.E("forgeCreatePR", fmt.Sprintf("HTTP %d: %s", resp.StatusCode, msg), nil) return "", 0, coreerr.E("forgeCreatePR", core.Sprintf("HTTP %d: %s", resp.StatusCode, msg), nil)
} }
var pr struct { var pr struct {
@ -225,7 +223,7 @@ func (s *PrepSubsystem) commentOnIssue(ctx context.Context, org, repo string, is
return return
} }
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues/%d/comments", s.forgeURL, org, repo, issue) url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d/comments", s.forgeURL, org, repo, issue)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
if err != nil { if err != nil {
return return
@ -337,7 +335,7 @@ func (s *PrepSubsystem) listPRs(ctx context.Context, _ *mcp.CallToolRequest, inp
} }
func (s *PrepSubsystem) listRepoPRs(ctx context.Context, org, repo, state string) ([]PRInfo, error) { func (s *PrepSubsystem) listRepoPRs(ctx context.Context, org, repo, state string) ([]PRInfo, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/pulls?state=%s&limit=10", url := core.Sprintf("%s/api/v1/repos/%s/%s/pulls?state=%s&limit=10",
s.forgeURL, org, repo, state) s.forgeURL, org, repo, state)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "token "+s.forgeToken) req.Header.Set("Authorization", "token "+s.forgeToken)
@ -348,7 +346,7 @@ func (s *PrepSubsystem) listRepoPRs(ctx context.Context, org, repo, state string
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
return nil, coreerr.E("listRepoPRs", fmt.Sprintf("HTTP %d for "+repo, resp.StatusCode), nil) return nil, coreerr.E("listRepoPRs", core.Sprintf("HTTP %d for "+repo, resp.StatusCode), nil)
} }
var prs []struct { var prs []struct {

View file

@ -8,18 +8,14 @@ import (
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"fmt"
goio "io"
"net/http" "net/http"
"os"
"os/exec" "os/exec"
"path/filepath"
"strings"
"time" "time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
@ -46,17 +42,17 @@ var (
// //
// prep := NewPrep() // prep := NewPrep()
func NewPrep() *PrepSubsystem { func NewPrep() *PrepSubsystem {
home, _ := os.UserHomeDir() home := core.Env("HOME")
forgeToken := os.Getenv("FORGE_TOKEN") forgeToken := core.Env("FORGE_TOKEN")
if forgeToken == "" { if forgeToken == "" {
forgeToken = os.Getenv("GITEA_TOKEN") forgeToken = core.Env("GITEA_TOKEN")
} }
brainKey := os.Getenv("CORE_BRAIN_KEY") brainKey := core.Env("CORE_BRAIN_KEY")
if brainKey == "" { if brainKey == "" {
if data, err := coreio.Local.Read(filepath.Join(home, ".claude", "brain.key")); err == nil { if data, err := coreio.Local.Read(core.Path(home, ".claude", "brain.key")); err == nil {
brainKey = strings.TrimSpace(data) brainKey = core.Trim(data)
} }
} }
@ -65,8 +61,8 @@ func NewPrep() *PrepSubsystem {
forgeToken: forgeToken, forgeToken: forgeToken,
brainURL: envOr("CORE_BRAIN_URL", "https://api.lthn.sh"), brainURL: envOr("CORE_BRAIN_URL", "https://api.lthn.sh"),
brainKey: brainKey, brainKey: brainKey,
specsPath: envOr("SPECS_PATH", filepath.Join(home, "Code", "host-uk", "specs")), specsPath: envOr("SPECS_PATH", core.Path(home, "Code", "host-uk", "specs")),
codePath: envOr("CODE_PATH", filepath.Join(home, "Code")), codePath: envOr("CODE_PATH", core.Path(home, "Code")),
client: &http.Client{Timeout: 30 * time.Second}, client: &http.Client{Timeout: 30 * time.Second},
} }
} }
@ -84,24 +80,24 @@ func (s *PrepSubsystem) emitChannel(ctx context.Context, channel string, data an
} }
func envOr(key, fallback string) string { func envOr(key, fallback string) string {
if v := os.Getenv(key); v != "" { if v := core.Env(key); v != "" {
return v return v
} }
return fallback return fallback
} }
func sanitizeRepoPathSegment(value, field string, allowSubdirs bool) (string, error) { func sanitizeRepoPathSegment(value, field string, allowSubdirs bool) (string, error) {
if strings.TrimSpace(value) != value { if core.Trim(value) != value {
return "", coreerr.E("prepWorkspace", field+" contains whitespace", nil) return "", coreerr.E("prepWorkspace", field+" contains whitespace", nil)
} }
if value == "" { if value == "" {
return "", nil return "", nil
} }
if strings.Contains(value, "\\") { if core.Contains(value, "\\") {
return "", coreerr.E("prepWorkspace", field+" contains invalid path separator", nil) return "", coreerr.E("prepWorkspace", field+" contains invalid path separator", nil)
} }
parts := strings.Split(value, "/") parts := core.Split(value, "/")
if !allowSubdirs && len(parts) != 1 { if !allowSubdirs && len(parts) != 1 {
return "", coreerr.E("prepWorkspace", field+" may not contain subdirectories", nil) return "", coreerr.E("prepWorkspace", field+" may not contain subdirectories", nil)
} }
@ -161,7 +157,7 @@ func (s *PrepSubsystem) Shutdown(_ context.Context) error { return nil }
// workspaceRoot returns the base directory for agent workspaces. // workspaceRoot returns the base directory for agent workspaces.
func (s *PrepSubsystem) workspaceRoot() string { func (s *PrepSubsystem) workspaceRoot() string {
return filepath.Join(s.codePath, ".core", "workspace") return core.Path(s.codePath, ".core", "workspace")
} }
// --- Input/Output types --- // --- Input/Output types ---
@ -227,8 +223,8 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques
// Workspace root: .core/workspace/{repo}-{timestamp}/ // Workspace root: .core/workspace/{repo}-{timestamp}/
wsRoot := s.workspaceRoot() wsRoot := s.workspaceRoot()
coreio.Local.EnsureDir(wsRoot) coreio.Local.EnsureDir(wsRoot)
wsName := fmt.Sprintf("%s-%d", input.Repo, time.Now().Unix()) wsName := core.Sprintf("%s-%d", input.Repo, time.Now().Unix())
wsDir := filepath.Join(wsRoot, wsName) wsDir := core.Path(wsRoot, wsName)
// Create workspace structure // Create workspace structure
// kb/ and specs/ will be created inside src/ after clone // kb/ and specs/ will be created inside src/ after clone
@ -236,10 +232,10 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques
out := PrepOutput{WorkspaceDir: wsDir} out := PrepOutput{WorkspaceDir: wsDir}
// Source repo path // Source repo path
repoPath := filepath.Join(s.codePath, "core", input.Repo) repoPath := core.Path(s.codePath, "core", input.Repo)
// 1. Clone repo into src/ and create feature branch // 1. Clone repo into src/ and create feature branch
srcDir := filepath.Join(wsDir, "src") srcDir := core.Path(wsDir, "src")
cloneCmd := exec.CommandContext(ctx, "git", "clone", repoPath, srcDir) cloneCmd := exec.CommandContext(ctx, "git", "clone", repoPath, srcDir)
if err := cloneCmd.Run(); err != nil { if err := cloneCmd.Run(); err != nil {
return nil, PrepOutput{}, coreerr.E("prepWorkspace", "failed to clone repository", err) return nil, PrepOutput{}, coreerr.E("prepWorkspace", "failed to clone repository", err)
@ -251,12 +247,12 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques
taskSlug := branchSlug(input.Task) taskSlug := branchSlug(input.Task)
if input.Issue > 0 { if input.Issue > 0 {
issueSlug := branchSlug(input.Task) issueSlug := branchSlug(input.Task)
branchName = fmt.Sprintf("agent/issue-%d", input.Issue) branchName = core.Sprintf("agent/issue-%d", input.Issue)
if issueSlug != "" { if issueSlug != "" {
branchName += "-" + issueSlug branchName += "-" + issueSlug
} }
} else if taskSlug != "" { } else if taskSlug != "" {
branchName = fmt.Sprintf("agent/%s", taskSlug) branchName = core.Sprintf("agent/%s", taskSlug)
} }
} }
if branchName != "" { if branchName != "" {
@ -269,29 +265,29 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques
} }
// Create context dirs inside src/ // Create context dirs inside src/
coreio.Local.EnsureDir(filepath.Join(srcDir, "kb")) coreio.Local.EnsureDir(core.Path(srcDir, "kb"))
coreio.Local.EnsureDir(filepath.Join(srcDir, "specs")) coreio.Local.EnsureDir(core.Path(srcDir, "specs"))
// Remote stays as local clone origin — agent cannot push to forge. // Remote stays as local clone origin — agent cannot push to forge.
// Reviewer pulls changes from workspace and pushes after verification. // Reviewer pulls changes from workspace and pushes after verification.
// 2. Copy CLAUDE.md and GEMINI.md to workspace // 2. Copy CLAUDE.md and GEMINI.md to workspace
claudeMdPath := filepath.Join(repoPath, "CLAUDE.md") claudeMdPath := core.Path(repoPath, "CLAUDE.md")
if data, err := coreio.Local.Read(claudeMdPath); err == nil { if data, err := coreio.Local.Read(claudeMdPath); err == nil {
_ = writeAtomic(filepath.Join(wsDir, "src", "CLAUDE.md"), data) _ = writeAtomic(core.Path(wsDir, "src", "CLAUDE.md"), data)
out.ClaudeMd = true out.ClaudeMd = true
} }
// Copy GEMINI.md from core/agent (ethics framework for all agents) // Copy GEMINI.md from core/agent (ethics framework for all agents)
agentGeminiMd := filepath.Join(s.codePath, "core", "agent", "GEMINI.md") agentGeminiMd := core.Path(s.codePath, "core", "agent", "GEMINI.md")
if data, err := coreio.Local.Read(agentGeminiMd); err == nil { if data, err := coreio.Local.Read(agentGeminiMd); err == nil {
_ = writeAtomic(filepath.Join(wsDir, "src", "GEMINI.md"), data) _ = writeAtomic(core.Path(wsDir, "src", "GEMINI.md"), data)
} }
// Copy persona if specified // Copy persona if specified
if persona != "" { if persona != "" {
personaPath := filepath.Join(s.codePath, "core", "agent", "prompts", "personas", persona+".md") personaPath := core.Path(s.codePath, "core", "agent", "prompts", "personas", persona+".md")
if data, err := coreio.Local.Read(personaPath); err == nil { if data, err := coreio.Local.Read(personaPath); err == nil {
_ = writeAtomic(filepath.Join(wsDir, "src", "PERSONA.md"), data) _ = writeAtomic(core.Path(wsDir, "src", "PERSONA.md"), data)
} }
} }
@ -299,9 +295,9 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques
if input.Issue > 0 { if input.Issue > 0 {
s.generateTodo(ctx, input.Org, input.Repo, input.Issue, wsDir) s.generateTodo(ctx, input.Org, input.Repo, input.Issue, wsDir)
} else if input.Task != "" { } else if input.Task != "" {
todo := fmt.Sprintf("# TASK: %s\n\n**Repo:** %s/%s\n**Status:** ready\n\n## Objective\n\n%s\n", todo := core.Sprintf("# TASK: %s\n\n**Repo:** %s/%s\n**Status:** ready\n\n## Objective\n\n%s\n",
input.Task, input.Org, input.Repo, input.Task) input.Task, input.Org, input.Repo, input.Task)
_ = writeAtomic(filepath.Join(wsDir, "src", "TODO.md"), todo) _ = writeAtomic(core.Path(wsDir, "src", "TODO.md"), todo)
} }
// 4. Generate CONTEXT.md from OpenBrain // 4. Generate CONTEXT.md from OpenBrain
@ -333,12 +329,12 @@ func (s *PrepSubsystem) prepWorkspace(ctx context.Context, _ *mcp.CallToolReques
// branchSlug converts a free-form string into a git-friendly branch suffix. // branchSlug converts a free-form string into a git-friendly branch suffix.
func branchSlug(value string) string { func branchSlug(value string) string {
value = strings.ToLower(strings.TrimSpace(value)) value = core.Lower(core.Trim(value))
if value == "" { if value == "" {
return "" return ""
} }
var b strings.Builder b := core.NewBuilder()
b.Grow(len(value)) b.Grow(len(value))
lastDash := false lastDash := false
for _, r := range value { for _, r := range value {
@ -359,14 +355,42 @@ func branchSlug(value string) string {
} }
} }
slug := strings.Trim(b.String(), "-") slug := trimDashes(b.String())
if len(slug) > 40 { if len(slug) > 40 {
slug = slug[:40] slug = trimDashes(slug[:40])
slug = strings.Trim(slug, "-")
} }
return slug return slug
} }
// sanitizeFilename replaces non-alphanumeric characters (except - _ .) with dashes.
func sanitizeFilename(title string) string {
b := core.NewBuilder()
b.Grow(len(title))
for _, r := range title {
switch {
case r >= 'a' && r <= 'z', r >= 'A' && r <= 'Z', r >= '0' && r <= '9',
r == '-', r == '_', r == '.':
b.WriteRune(r)
default:
b.WriteByte('-')
}
}
return b.String()
}
// trimDashes strips leading and trailing dash characters from a string.
func trimDashes(s string) string {
start := 0
for start < len(s) && s[start] == '-' {
start++
}
end := len(s)
for end > start && s[end-1] == '-' {
end--
}
return s[start:end]
}
// --- Prompt templates --- // --- Prompt templates ---
func (s *PrepSubsystem) writePromptTemplate(template, wsDir string) { func (s *PrepSubsystem) writePromptTemplate(template, wsDir string) {
@ -434,7 +458,7 @@ Do NOT push. Commit only — a reviewer will verify and push.
prompt = "Read TODO.md and complete the task. Work in src/.\n" prompt = "Read TODO.md and complete the task. Work in src/.\n"
} }
_ = writeAtomic(filepath.Join(wsDir, "src", "PROMPT.md"), prompt) _ = writeAtomic(core.Path(wsDir, "src", "PROMPT.md"), prompt)
} }
// --- Plan template rendering --- // --- Plan template rendering ---
@ -443,11 +467,11 @@ Do NOT push. Commit only — a reviewer will verify and push.
// and writes PLAN.md into the workspace src/ directory. // and writes PLAN.md into the workspace src/ directory.
func (s *PrepSubsystem) writePlanFromTemplate(templateSlug string, variables map[string]string, task string, wsDir string) { func (s *PrepSubsystem) writePlanFromTemplate(templateSlug string, variables map[string]string, task string, wsDir string) {
// Look for template in core/agent/prompts/templates/ // Look for template in core/agent/prompts/templates/
templatePath := filepath.Join(s.codePath, "core", "agent", "prompts", "templates", templateSlug+".yaml") templatePath := core.Path(s.codePath, "core", "agent", "prompts", "templates", templateSlug+".yaml")
content, err := coreio.Local.Read(templatePath) content, err := coreio.Local.Read(templatePath)
if err != nil { if err != nil {
// Try .yml extension // Try .yml extension
templatePath = filepath.Join(s.codePath, "core", "agent", "prompts", "templates", templateSlug+".yml") templatePath = core.Path(s.codePath, "core", "agent", "prompts", "templates", templateSlug+".yml")
content, err = coreio.Local.Read(templatePath) content, err = coreio.Local.Read(templatePath)
if err != nil { if err != nil {
return // Template not found, skip silently return // Template not found, skip silently
@ -456,8 +480,8 @@ func (s *PrepSubsystem) writePlanFromTemplate(templateSlug string, variables map
// Substitute variables ({{variable_name}} → value) // Substitute variables ({{variable_name}} → value)
for key, value := range variables { for key, value := range variables {
content = strings.ReplaceAll(content, "{{"+key+"}}", value) content = core.Replace(content, "{{"+key+"}}", value)
content = strings.ReplaceAll(content, "{{ "+key+" }}", value) content = core.Replace(content, "{{ "+key+" }}", value)
} }
// Parse the YAML to render as markdown // Parse the YAML to render as markdown
@ -477,7 +501,7 @@ func (s *PrepSubsystem) writePlanFromTemplate(templateSlug string, variables map
} }
// Render as PLAN.md // Render as PLAN.md
var plan strings.Builder plan := core.NewBuilder()
plan.WriteString("# Plan: " + tmpl.Name + "\n\n") plan.WriteString("# Plan: " + tmpl.Name + "\n\n")
if task != "" { if task != "" {
plan.WriteString("**Task:** " + task + "\n\n") plan.WriteString("**Task:** " + task + "\n\n")
@ -495,7 +519,7 @@ func (s *PrepSubsystem) writePlanFromTemplate(templateSlug string, variables map
} }
for i, phase := range tmpl.Phases { for i, phase := range tmpl.Phases {
plan.WriteString(fmt.Sprintf("## Phase %d: %s\n\n", i+1, phase.Name)) plan.WriteString(core.Sprintf("## Phase %d: %s\n\n", i+1, phase.Name))
if phase.Description != "" { if phase.Description != "" {
plan.WriteString(phase.Description + "\n\n") plan.WriteString(phase.Description + "\n\n")
} }
@ -512,7 +536,7 @@ func (s *PrepSubsystem) writePlanFromTemplate(templateSlug string, variables map
plan.WriteString("\n**Commit after completing this phase.**\n\n---\n\n") plan.WriteString("\n**Commit after completing this phase.**\n\n---\n\n")
} }
_ = writeAtomic(filepath.Join(wsDir, "src", "PLAN.md"), plan.String()) _ = writeAtomic(core.Path(wsDir, "src", "PLAN.md"), plan.String())
} }
// --- Helpers (unchanged) --- // --- Helpers (unchanged) ---
@ -522,7 +546,7 @@ func (s *PrepSubsystem) pullWiki(ctx context.Context, org, repo, wsDir string) i
return 0 return 0
} }
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/wiki/pages", s.forgeURL, org, repo) url := core.Sprintf("%s/api/v1/repos/%s/%s/wiki/pages", s.forgeURL, org, repo)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil) req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil { if err != nil {
return 0 return 0
@ -553,7 +577,7 @@ func (s *PrepSubsystem) pullWiki(ctx context.Context, org, repo, wsDir string) i
subURL = page.Title subURL = page.Title
} }
pageURL := fmt.Sprintf("%s/api/v1/repos/%s/%s/wiki/page/%s", s.forgeURL, org, repo, subURL) pageURL := core.Sprintf("%s/api/v1/repos/%s/%s/wiki/page/%s", s.forgeURL, org, repo, subURL)
pageReq, err := http.NewRequestWithContext(ctx, "GET", pageURL, nil) pageReq, err := http.NewRequestWithContext(ctx, "GET", pageURL, nil)
if err != nil { if err != nil {
continue continue
@ -585,14 +609,9 @@ func (s *PrepSubsystem) pullWiki(ctx context.Context, org, repo, wsDir string) i
if err != nil { if err != nil {
continue continue
} }
filename := strings.Map(func(r rune) rune { filename := sanitizeFilename(page.Title) + ".md"
if r >= 'a' && r <= 'z' || r >= 'A' && r <= 'Z' || r >= '0' && r <= '9' || r == '-' || r == '_' || r == '.' {
return r
}
return '-'
}, page.Title) + ".md"
_ = writeAtomic(filepath.Join(wsDir, "src", "kb", filename), string(content)) _ = writeAtomic(core.Path(wsDir, "src", "kb", filename), string(content))
count++ count++
} }
@ -604,9 +623,9 @@ func (s *PrepSubsystem) copySpecs(wsDir string) int {
count := 0 count := 0
for _, file := range specFiles { for _, file := range specFiles {
src := filepath.Join(s.specsPath, file) src := core.Path(s.specsPath, file)
if data, err := coreio.Local.Read(src); err == nil { if data, err := coreio.Local.Read(src); err == nil {
_ = writeAtomic(filepath.Join(wsDir, "src", "specs", file), data) _ = writeAtomic(core.Path(wsDir, "src", "specs", file), data)
count++ count++
} }
} }
@ -629,7 +648,7 @@ func (s *PrepSubsystem) generateContext(ctx context.Context, repo, wsDir string)
return 0 return 0
} }
req, err := http.NewRequestWithContext(ctx, "POST", s.brainURL+"/v1/brain/recall", strings.NewReader(string(body))) req, err := http.NewRequestWithContext(ctx, "POST", s.brainURL+"/v1/brain/recall", core.NewReader(string(body)))
if err != nil { if err != nil {
return 0 return 0
} }
@ -646,18 +665,18 @@ func (s *PrepSubsystem) generateContext(ctx context.Context, repo, wsDir string)
return 0 return 0
} }
respData, err := goio.ReadAll(resp.Body) readResult := core.ReadAll(resp.Body)
if err != nil { if !readResult.OK {
return 0 return 0
} }
var result struct { var result struct {
Memories []map[string]any `json:"memories"` Memories []map[string]any `json:"memories"`
} }
if err := json.Unmarshal(respData, &result); err != nil { if ur := core.JSONUnmarshal([]byte(readResult.Value.(string)), &result); !ur.OK {
return 0 return 0
} }
var content strings.Builder content := core.NewBuilder()
content.WriteString("# Context — " + repo + "\n\n") content.WriteString("# Context — " + repo + "\n\n")
content.WriteString("> Relevant knowledge from OpenBrain.\n\n") content.WriteString("> Relevant knowledge from OpenBrain.\n\n")
@ -666,15 +685,15 @@ func (s *PrepSubsystem) generateContext(ctx context.Context, repo, wsDir string)
memContent, _ := mem["content"].(string) memContent, _ := mem["content"].(string)
memProject, _ := mem["project"].(string) memProject, _ := mem["project"].(string)
score, _ := mem["score"].(float64) score, _ := mem["score"].(float64)
content.WriteString(fmt.Sprintf("### %d. %s [%s] (score: %.3f)\n\n%s\n\n", i+1, memProject, memType, score, memContent)) content.WriteString(core.Sprintf("### %d. %s [%s] (score: %.3f)\n\n%s\n\n", i+1, memProject, memType, score, memContent))
} }
_ = writeAtomic(filepath.Join(wsDir, "src", "CONTEXT.md"), content.String()) _ = writeAtomic(core.Path(wsDir, "src", "CONTEXT.md"), content.String())
return len(result.Memories) return len(result.Memories)
} }
func (s *PrepSubsystem) findConsumers(repo, wsDir string) int { func (s *PrepSubsystem) findConsumers(repo, wsDir string) int {
goWorkPath := filepath.Join(s.codePath, "go.work") goWorkPath := core.Path(s.codePath, "go.work")
modulePath := "forge.lthn.ai/core/" + repo modulePath := "forge.lthn.ai/core/" + repo
workData, err := coreio.Local.Read(goWorkPath) workData, err := coreio.Local.Read(goWorkPath)
@ -683,19 +702,19 @@ func (s *PrepSubsystem) findConsumers(repo, wsDir string) int {
} }
var consumers []string var consumers []string
for _, line := range strings.Split(workData, "\n") { for _, line := range core.Split(workData, "\n") {
line = strings.TrimSpace(line) line = core.Trim(line)
if !strings.HasPrefix(line, "./") { if !core.HasPrefix(line, "./") {
continue continue
} }
dir := filepath.Join(s.codePath, strings.TrimPrefix(line, "./")) dir := core.Path(s.codePath, core.TrimPrefix(line, "./"))
goMod := filepath.Join(dir, "go.mod") goMod := core.Path(dir, "go.mod")
modData, err := coreio.Local.Read(goMod) modData, err := coreio.Local.Read(goMod)
if err != nil { if err != nil {
continue continue
} }
if strings.Contains(modData, modulePath) && !strings.HasPrefix(modData, "module "+modulePath) { if core.Contains(modData, modulePath) && !core.HasPrefix(modData, "module "+modulePath) {
consumers = append(consumers, filepath.Base(dir)) consumers = append(consumers, core.PathBase(dir))
} }
} }
@ -705,8 +724,8 @@ func (s *PrepSubsystem) findConsumers(repo, wsDir string) int {
for _, c := range consumers { for _, c := range consumers {
content += "- " + c + "\n" content += "- " + c + "\n"
} }
content += fmt.Sprintf("\n**Breaking change risk: %d consumers.**\n", len(consumers)) content += core.Sprintf("\n**Breaking change risk: %d consumers.**\n", len(consumers))
_ = writeAtomic(filepath.Join(wsDir, "src", "CONSUMERS.md"), content) _ = writeAtomic(core.Path(wsDir, "src", "CONSUMERS.md"), content)
} }
return len(consumers) return len(consumers)
@ -720,10 +739,10 @@ func (s *PrepSubsystem) gitLog(repoPath, wsDir string) int {
return 0 return 0
} }
lines := strings.Split(strings.TrimSpace(string(output)), "\n") lines := core.Split(core.Trim(string(output)), "\n")
if len(lines) > 0 && lines[0] != "" { if len(lines) > 0 && lines[0] != "" {
content := "# Recent Changes\n\n```\n" + string(output) + "```\n" content := "# Recent Changes\n\n```\n" + string(output) + "```\n"
_ = writeAtomic(filepath.Join(wsDir, "src", "RECENT.md"), content) _ = writeAtomic(core.Path(wsDir, "src", "RECENT.md"), content)
} }
return len(lines) return len(lines)
@ -734,7 +753,7 @@ func (s *PrepSubsystem) generateTodo(ctx context.Context, org, repo string, issu
return return
} }
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue) url := core.Sprintf("%s/api/v1/repos/%s/%s/issues/%d", s.forgeURL, org, repo, issue)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "token "+s.forgeToken) req.Header.Set("Authorization", "token "+s.forgeToken)
@ -753,11 +772,11 @@ func (s *PrepSubsystem) generateTodo(ctx context.Context, org, repo string, issu
} }
json.NewDecoder(resp.Body).Decode(&issueData) json.NewDecoder(resp.Body).Decode(&issueData)
content := fmt.Sprintf("# TASK: %s\n\n", issueData.Title) content := core.Sprintf("# TASK: %s\n\n", issueData.Title)
content += fmt.Sprintf("**Status:** ready\n") content += core.Sprintf("**Status:** ready\n")
content += fmt.Sprintf("**Source:** %s/%s/%s/issues/%d\n", s.forgeURL, org, repo, issue) content += core.Sprintf("**Source:** %s/%s/%s/issues/%d\n", s.forgeURL, org, repo, issue)
content += fmt.Sprintf("**Repo:** %s/%s\n\n---\n\n", org, repo) content += core.Sprintf("**Repo:** %s/%s\n\n---\n\n", org, repo)
content += "## Objective\n\n" + issueData.Body + "\n" content += "## Objective\n\n" + issueData.Body + "\n"
_ = writeAtomic(filepath.Join(wsDir, "src", "TODO.md"), content) _ = writeAtomic(core.Path(wsDir, "src", "TODO.md"), content)
} }

View file

@ -3,18 +3,19 @@
package agentic package agentic
import ( import (
"fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"strings"
"syscall" "syscall"
"time" "time"
coreio "forge.lthn.ai/core/go-io" core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
// os.Create, os.Open, os.DevNull, os.Environ, os.FindProcess are used for
// process spawning and management — no core equivalents for these OS primitives.
// DispatchConfig controls agent dispatch behaviour. // DispatchConfig controls agent dispatch behaviour.
type DispatchConfig struct { type DispatchConfig struct {
DefaultAgent string `yaml:"default_agent"` DefaultAgent string `yaml:"default_agent"`
@ -43,7 +44,7 @@ type AgentsConfig struct {
// loadAgentsConfig reads config/agents.yaml from the code path. // loadAgentsConfig reads config/agents.yaml from the code path.
func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig { func (s *PrepSubsystem) loadAgentsConfig() *AgentsConfig {
paths := []string{ paths := []string{
filepath.Join(s.codePath, ".core", "agents.yaml"), core.Path(s.codePath, ".core", "agents.yaml"),
} }
for _, path := range paths { for _, path := range paths {
@ -79,9 +80,16 @@ func (s *PrepSubsystem) delayForAgent(agent string) time.Duration {
return 0 return 0
} }
// Parse reset time // Parse reset time (format: "HH:MM")
resetHour, resetMin := 6, 0 resetHour, resetMin := 6, 0
fmt.Sscanf(rate.ResetUTC, "%d:%d", &resetHour, &resetMin) if parts := core.Split(rate.ResetUTC, ":"); len(parts) == 2 {
if h, ok := parseSimpleInt(parts[0]); ok {
resetHour = h
}
if m, ok := parseSimpleInt(parts[1]); ok {
resetMin = m
}
}
now := time.Now().UTC() now := time.Now().UTC()
resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC) resetToday := time.Date(now.Year(), now.Month(), now.Day(), resetHour, resetMin, 0, 0, time.UTC)
@ -115,9 +123,9 @@ func (s *PrepSubsystem) listWorkspaceDirs() []string {
if !entry.IsDir() { if !entry.IsDir() {
continue continue
} }
path := filepath.Join(wsRoot, entry.Name()) path := core.Path(wsRoot, entry.Name())
// Check if this dir has a status.json (it's a workspace) // Check if this dir has a status.json (it's a workspace)
if coreio.Local.IsFile(filepath.Join(path, "status.json")) { if coreio.Local.IsFile(core.Path(path, "status.json")) {
dirs = append(dirs, path) dirs = append(dirs, path)
continue continue
} }
@ -128,8 +136,8 @@ func (s *PrepSubsystem) listWorkspaceDirs() []string {
} }
for _, sub := range subEntries { for _, sub := range subEntries {
if sub.IsDir() { if sub.IsDir() {
subPath := filepath.Join(path, sub.Name()) subPath := core.Path(path, sub.Name())
if coreio.Local.IsFile(filepath.Join(subPath, "status.json")) { if coreio.Local.IsFile(core.Path(subPath, "status.json")) {
dirs = append(dirs, subPath) dirs = append(dirs, subPath)
} }
} }
@ -146,7 +154,7 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
if err != nil || st.Status != "running" { if err != nil || st.Status != "running" {
continue continue
} }
stBase := strings.SplitN(st.Agent, ":", 2)[0] stBase := core.SplitN(st.Agent, ":", 2)[0]
if stBase != agent { if stBase != agent {
continue continue
} }
@ -162,7 +170,7 @@ func (s *PrepSubsystem) countRunningByAgent(agent string) int {
// baseAgent strips the model variant (gemini:flash → gemini). // baseAgent strips the model variant (gemini:flash → gemini).
func baseAgent(agent string) string { func baseAgent(agent string) string {
return strings.SplitN(agent, ":", 2)[0] return core.SplitN(agent, ":", 2)[0]
} }
// canDispatchAgent checks if we're under the concurrency limit for a specific agent type. // canDispatchAgent checks if we're under the concurrency limit for a specific agent type.
@ -176,6 +184,23 @@ func (s *PrepSubsystem) canDispatchAgent(agent string) bool {
return s.countRunningByAgent(base) < limit return s.countRunningByAgent(base) < limit
} }
// parseSimpleInt parses a small non-negative integer from a string.
// Returns (value, true) on success, (0, false) on failure.
func parseSimpleInt(s string) (int, bool) {
s = core.Trim(s)
if s == "" {
return 0, false
}
n := 0
for _, r := range s {
if r < '0' || r > '9' {
return 0, false
}
n = n*10 + int(r-'0')
}
return n, true
}
// canDispatch is kept for backwards compat. // canDispatch is kept for backwards compat.
func (s *PrepSubsystem) canDispatch() bool { func (s *PrepSubsystem) canDispatch() bool {
return true return true
@ -205,7 +230,7 @@ func (s *PrepSubsystem) drainQueue() {
continue continue
} }
srcDir := filepath.Join(wsDir, "src") srcDir := core.Path(wsDir, "src")
prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory." prompt := "Read PROMPT.md for instructions. All context files (CLAUDE.md, TODO.md, CONTEXT.md, CONSUMERS.md, RECENT.md) are in the parent directory. Work in this directory."
command, args, err := agentCommand(st.Agent, prompt) command, args, err := agentCommand(st.Agent, prompt)
@ -213,7 +238,7 @@ func (s *PrepSubsystem) drainQueue() {
continue continue
} }
outputFile := filepath.Join(wsDir, fmt.Sprintf("agent-%s.log", st.Agent)) outputFile := core.Path(wsDir, core.Sprintf("agent-%s.log", st.Agent))
outFile, err := os.Create(outputFile) outFile, err := os.Create(outputFile)
if err != nil { if err != nil {
continue continue

View file

@ -5,19 +5,18 @@ package agentic
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"os"
"os/exec" "os/exec"
"path/filepath"
"regexp" "regexp"
"strconv" "strconv"
"strings"
"time" "time"
coreerr "forge.lthn.ai/core/go-log" core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
) )
func listLocalRepos(basePath string) []string { func listLocalRepos(basePath string) []string {
entries, err := os.ReadDir(basePath) entries, err := coreio.Local.List(basePath)
if err != nil { if err != nil {
return nil return nil
} }
@ -35,7 +34,7 @@ func hasRemote(repoDir, remote string) bool {
cmd := exec.Command("git", "remote", "get-url", remote) cmd := exec.Command("git", "remote", "get-url", remote)
cmd.Dir = repoDir cmd.Dir = repoDir
if out, err := cmd.Output(); err == nil { if out, err := cmd.Output(); err == nil {
return strings.TrimSpace(string(out)) != "" return core.Trim(string(out)) != ""
} }
return false return false
} }
@ -48,7 +47,7 @@ func commitsAhead(repoDir, baseRef, headRef string) int {
return 0 return 0
} }
count, err := parsePositiveInt(strings.TrimSpace(string(out))) count, err := parsePositiveInt(core.Trim(string(out)))
if err != nil { if err != nil {
return 0 return 0
} }
@ -64,8 +63,8 @@ func filesChanged(repoDir, baseRef, headRef string) int {
} }
count := 0 count := 0
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { for _, line := range core.Split(core.Trim(string(out)), "\n") {
if strings.TrimSpace(line) != "" { if core.Trim(line) != "" {
count++ count++
} }
} }
@ -79,11 +78,11 @@ func gitOutput(repoDir string, args ...string) (string, error) {
if err != nil { if err != nil {
return "", coreerr.E("gitOutput", string(out), err) return "", coreerr.E("gitOutput", string(out), err)
} }
return strings.TrimSpace(string(out)), nil return core.Trim(string(out)), nil
} }
func parsePositiveInt(value string) (int, error) { func parsePositiveInt(value string) (int, error) {
value = strings.TrimSpace(value) value = core.Trim(value)
if value == "" { if value == "" {
return 0, coreerr.E("parsePositiveInt", "empty value", nil) return 0, coreerr.E("parsePositiveInt", "empty value", nil)
} }
@ -148,11 +147,11 @@ func createGitHubPR(ctx context.Context, repoDir, repo string, commits, files in
return "", coreerr.E("createGitHubPR", string(out), err) return "", coreerr.E("createGitHubPR", string(out), err)
} }
lines := strings.Split(strings.TrimSpace(string(out)), "\n") lines := core.Split(core.Trim(string(out)), "\n")
if len(lines) == 0 { if len(lines) == 0 {
return "", nil return "", nil
} }
return strings.TrimSpace(lines[len(lines)-1]), nil return core.Trim(lines[len(lines)-1]), nil
} }
func ensureDevBranch(repoDir string) error { func ensureDevBranch(repoDir string) error {
@ -194,7 +193,7 @@ func parseRetryAfter(detail string) time.Duration {
return 5 * time.Minute return 5 * time.Minute
} }
switch strings.ToLower(match[2]) { switch core.Lower(match[2]) {
case "hour", "hours": case "hour", "hours":
return time.Duration(n) * time.Hour return time.Duration(n) * time.Hour
case "second", "seconds": case "second", "seconds":
@ -205,5 +204,5 @@ func parseRetryAfter(detail string) time.Duration {
} }
func repoRootFromCodePath(codePath string) string { func repoRootFromCodePath(codePath string) string {
return filepath.Join(codePath, "core") return core.Path(codePath, "core")
} }

View file

@ -4,16 +4,14 @@ package agentic
import ( import (
"context" "context"
"fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"strings"
"syscall" "syscall"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -52,8 +50,8 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
return nil, ResumeOutput{}, coreerr.E("resume", "workspace is required", nil) return nil, ResumeOutput{}, coreerr.E("resume", "workspace is required", nil)
} }
wsDir := filepath.Join(s.workspaceRoot(), input.Workspace) wsDir := core.Path(s.workspaceRoot(), input.Workspace)
srcDir := filepath.Join(wsDir, "src") srcDir := core.Path(wsDir, "src")
// Verify workspace exists // Verify workspace exists
if _, err := coreio.Local.List(srcDir); err != nil { if _, err := coreio.Local.List(srcDir); err != nil {
@ -78,8 +76,8 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
// Write ANSWER.md if answer provided // Write ANSWER.md if answer provided
if input.Answer != "" { if input.Answer != "" {
answerPath := filepath.Join(srcDir, "ANSWER.md") answerPath := core.Path(srcDir, "ANSWER.md")
content := fmt.Sprintf("# Answer\n\n%s\n", input.Answer) content := core.Sprintf("# Answer\n\n%s\n", input.Answer)
if err := writeAtomic(answerPath, content); err != nil { if err := writeAtomic(answerPath, content); err != nil {
return nil, ResumeOutput{}, coreerr.E("resume", "failed to write ANSWER.md", err) return nil, ResumeOutput{}, coreerr.E("resume", "failed to write ANSWER.md", err)
} }
@ -102,7 +100,7 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
} }
// Spawn agent as detached process (survives parent death) // Spawn agent as detached process (survives parent death)
outputFile := filepath.Join(wsDir, fmt.Sprintf("agent-%s-run%d.log", agent, st.Runs+1)) outputFile := core.Path(wsDir, core.Sprintf("agent-%s-run%d.log", agent, st.Runs+1))
command, args, err := agentCommand(agent, prompt) command, args, err := agentCommand(agent, prompt)
if err != nil { if err != nil {
@ -154,10 +152,10 @@ func (s *PrepSubsystem) resume(ctx context.Context, _ *mcp.CallToolRequest, inpu
"branch": st.Branch, "branch": st.Branch,
} }
if data, err := coreio.Local.Read(filepath.Join(srcDir, "BLOCKED.md")); err == nil { if data, err := coreio.Local.Read(core.Path(srcDir, "BLOCKED.md")); err == nil {
status = "blocked" status = "blocked"
channel = coremcp.ChannelAgentBlocked channel = coremcp.ChannelAgentBlocked
st.Question = strings.TrimSpace(data) st.Question = core.Trim(data)
if st.Question != "" { if st.Question != "" {
payload["question"] = st.Question payload["question"] = st.Question
} }

View file

@ -5,16 +5,14 @@ package agentic
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"os/exec" "os/exec"
"path/filepath"
"regexp" "regexp"
"strings"
"time" "time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -93,7 +91,7 @@ func (s *PrepSubsystem) reviewQueue(ctx context.Context, _ *mcp.CallToolRequest,
continue continue
} }
repoDir := filepath.Join(basePath, repo) repoDir := core.Path(basePath, repo)
reviewer := input.Reviewer reviewer := input.Reviewer
if reviewer == "" { if reviewer == "" {
reviewer = "coderabbit" reviewer = "coderabbit"
@ -137,7 +135,7 @@ func (s *PrepSubsystem) findReviewCandidates(basePath string) []string {
if !entry.IsDir() { if !entry.IsDir() {
continue continue
} }
repoDir := filepath.Join(basePath, entry.Name()) repoDir := core.Path(basePath, entry.Name())
if !hasRemote(repoDir, "github") { if !hasRemote(repoDir, "github") {
continue continue
} }
@ -154,22 +152,22 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer
if rl := s.loadRateLimitState(); rl != nil && rl.Limited && time.Now().Before(rl.RetryAt) { if rl := s.loadRateLimitState(); rl != nil && rl.Limited && time.Now().Before(rl.RetryAt) {
result.Verdict = "rate_limited" result.Verdict = "rate_limited"
result.Detail = fmt.Sprintf("retry after %s", rl.RetryAt.Format(time.RFC3339)) result.Detail = core.Sprintf("retry after %s", rl.RetryAt.Format(time.RFC3339))
return result return result
} }
cmd := reviewerCommand(ctx, repoDir, reviewer) cmd := reviewerCommand(ctx, repoDir, reviewer)
cmd.Dir = repoDir cmd.Dir = repoDir
out, err := cmd.CombinedOutput() out, err := cmd.CombinedOutput()
output := strings.TrimSpace(string(out)) output := core.Trim(string(out))
if strings.Contains(strings.ToLower(output), "rate limit") { if core.Contains(core.Lower(output), "rate limit") {
result.Verdict = "rate_limited" result.Verdict = "rate_limited"
result.Detail = output result.Detail = output
return result return result
} }
if err != nil && !strings.Contains(output, "No findings") && !strings.Contains(output, "no issues") { if err != nil && !core.Contains(output, "No findings") && !core.Contains(output, "no issues") {
result.Verdict = "error" result.Verdict = "error"
if output != "" { if output != "" {
result.Detail = output result.Detail = output
@ -182,7 +180,7 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer
s.storeReviewOutput(repoDir, repo, reviewer, output) s.storeReviewOutput(repoDir, repo, reviewer, output)
result.Findings = countFindingHints(output) result.Findings = countFindingHints(output)
if strings.Contains(output, "No findings") || strings.Contains(output, "no issues") || strings.Contains(output, "LGTM") { if core.Contains(output, "No findings") || core.Contains(output, "no issues") || core.Contains(output, "LGTM") {
result.Verdict = "clean" result.Verdict = "clean"
if dryRun { if dryRun {
result.Action = "skipped (dry run)" result.Action = "skipped (dry run)"
@ -198,7 +196,7 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer
mergeCmd.Dir = repoDir mergeCmd.Dir = repoDir
if mergeOut, err := mergeCmd.CombinedOutput(); err == nil { if mergeOut, err := mergeCmd.CombinedOutput(); err == nil {
result.Action = "merged" result.Action = "merged"
result.Detail = strings.TrimSpace(string(mergeOut)) result.Detail = core.Trim(string(mergeOut))
return result return result
} }
} }
@ -219,7 +217,7 @@ func (s *PrepSubsystem) reviewRepo(ctx context.Context, repoDir, repo, reviewer
func (s *PrepSubsystem) storeReviewOutput(repoDir, repo, reviewer, output string) { func (s *PrepSubsystem) storeReviewOutput(repoDir, repo, reviewer, output string) {
home := reviewQueueHomeDir() home := reviewQueueHomeDir()
dataDir := filepath.Join(home, ".core", "training", "reviews") dataDir := core.Path(home, ".core", "training", "reviews")
if err := coreio.Local.EnsureDir(dataDir); err != nil { if err := coreio.Local.EnsureDir(dataDir); err != nil {
return return
} }
@ -235,13 +233,13 @@ func (s *PrepSubsystem) storeReviewOutput(repoDir, repo, reviewer, output string
return return
} }
name := fmt.Sprintf("%s-%s-%d.json", repo, reviewer, time.Now().Unix()) name := core.Sprintf("%s-%s-%d.json", repo, reviewer, time.Now().Unix())
_ = writeAtomic(filepath.Join(dataDir, name), string(data)) _ = writeAtomic(core.Path(dataDir, name), string(data))
} }
func (s *PrepSubsystem) saveRateLimitState(info *RateLimitInfo) { func (s *PrepSubsystem) saveRateLimitState(info *RateLimitInfo) {
home := reviewQueueHomeDir() home := reviewQueueHomeDir()
path := filepath.Join(home, ".core", "coderabbit-ratelimit.json") path := core.Path(home, ".core", "coderabbit-ratelimit.json")
data, err := json.Marshal(info) data, err := json.Marshal(info)
if err != nil { if err != nil {
return return
@ -251,7 +249,7 @@ func (s *PrepSubsystem) saveRateLimitState(info *RateLimitInfo) {
func (s *PrepSubsystem) loadRateLimitState() *RateLimitInfo { func (s *PrepSubsystem) loadRateLimitState() *RateLimitInfo {
home := reviewQueueHomeDir() home := reviewQueueHomeDir()
path := filepath.Join(home, ".core", "coderabbit-ratelimit.json") path := core.Path(home, ".core", "coderabbit-ratelimit.json")
data, err := coreio.Local.Read(path) data, err := coreio.Local.Read(path)
if err != nil { if err != nil {
return nil return nil

View file

@ -5,11 +5,10 @@ package agentic
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"strings"
coreerr "forge.lthn.ai/core/go-log" core "dappco.re/go/core"
coreerr "dappco.re/go/core/log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -81,7 +80,7 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input
seen := make(map[string]bool) seen := make(map[string]bool)
var unique []ScanIssue var unique []ScanIssue
for _, issue := range allIssues { for _, issue := range allIssues {
key := fmt.Sprintf("%s#%d", issue.Repo, issue.Number) key := core.Sprintf("%s#%d", issue.Repo, issue.Number)
if !seen[key] { if !seen[key] {
seen[key] = true seen[key] = true
unique = append(unique, issue) unique = append(unique, issue)
@ -100,7 +99,7 @@ func (s *PrepSubsystem) scan(ctx context.Context, _ *mcp.CallToolRequest, input
} }
func (s *PrepSubsystem) listOrgRepos(ctx context.Context, org string) ([]string, error) { func (s *PrepSubsystem) listOrgRepos(ctx context.Context, org string) ([]string, error) {
url := fmt.Sprintf("%s/api/v1/orgs/%s/repos?limit=50", s.forgeURL, org) url := core.Sprintf("%s/api/v1/orgs/%s/repos?limit=50", s.forgeURL, org)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "token "+s.forgeToken) req.Header.Set("Authorization", "token "+s.forgeToken)
@ -110,7 +109,7 @@ func (s *PrepSubsystem) listOrgRepos(ctx context.Context, org string) ([]string,
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
return nil, coreerr.E("listOrgRepos", fmt.Sprintf("HTTP %d listing repos", resp.StatusCode), nil) return nil, coreerr.E("listOrgRepos", core.Sprintf("HTTP %d listing repos", resp.StatusCode), nil)
} }
var repos []struct { var repos []struct {
@ -126,7 +125,7 @@ func (s *PrepSubsystem) listOrgRepos(ctx context.Context, org string) ([]string,
} }
func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label string) ([]ScanIssue, error) { func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label string) ([]ScanIssue, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&labels=%s&limit=10&type=issues", url := core.Sprintf("%s/api/v1/repos/%s/%s/issues?state=open&labels=%s&limit=10&type=issues",
s.forgeURL, org, repo, label) s.forgeURL, org, repo, label)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
req.Header.Set("Authorization", "token "+s.forgeToken) req.Header.Set("Authorization", "token "+s.forgeToken)
@ -137,7 +136,7 @@ func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label str
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != 200 { if resp.StatusCode != 200 {
return nil, coreerr.E("listRepoIssues", fmt.Sprintf("HTTP %d for "+repo, resp.StatusCode), nil) return nil, coreerr.E("listRepoIssues", core.Sprintf("HTTP %d for "+repo, resp.StatusCode), nil)
} }
var issues []struct { var issues []struct {
@ -170,7 +169,7 @@ func (s *PrepSubsystem) listRepoIssues(ctx context.Context, org, repo, label str
Title: issue.Title, Title: issue.Title,
Labels: labels, Labels: labels,
Assignee: assignee, Assignee: assignee,
URL: strings.Replace(issue.HTMLURL, "https://forge.lthn.ai", s.forgeURL, 1), URL: core.Replace(issue.HTMLURL, "https://forge.lthn.ai", s.forgeURL),
}) })
} }

View file

@ -6,16 +6,18 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"os" "os"
"path/filepath"
"strings"
"time" "time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
// os.Stat and os.FindProcess are used for workspace age detection and PID
// liveness checks — these are OS-level queries with no core equivalent.
// Workspace status file convention: // Workspace status file convention:
// //
// {workspace}/status.json — current state of the workspace // {workspace}/status.json — current state of the workspace
@ -57,23 +59,23 @@ func writeStatus(wsDir string, status *WorkspaceStatus) error {
if err != nil { if err != nil {
return err return err
} }
return writeAtomic(filepath.Join(wsDir, "status.json"), string(data)) return writeAtomic(core.JoinPath(wsDir, "status.json"), string(data))
} }
func (s *PrepSubsystem) saveStatus(wsDir string, status *WorkspaceStatus) { func (s *PrepSubsystem) saveStatus(wsDir string, status *WorkspaceStatus) {
if err := writeStatus(wsDir, status); err != nil { if err := writeStatus(wsDir, status); err != nil {
coreerr.Warn("failed to write workspace status", "workspace", filepath.Base(wsDir), "err", err) coreerr.Warn("failed to write workspace status", "workspace", core.PathBase(wsDir), "err", err)
} }
} }
func readStatus(wsDir string) (*WorkspaceStatus, error) { func readStatus(wsDir string) (*WorkspaceStatus, error) {
data, err := coreio.Local.Read(filepath.Join(wsDir, "status.json")) data, err := coreio.Local.Read(core.JoinPath(wsDir, "status.json"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
var s WorkspaceStatus var s WorkspaceStatus
if err := json.Unmarshal([]byte(data), &s); err != nil { if r := core.JSONUnmarshal([]byte(data), &s); !r.OK {
return nil, err return nil, coreerr.E("readStatus", "failed to parse status.json", nil)
} }
return &s, nil return &s, nil
} }
@ -126,7 +128,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
var workspaces []WorkspaceInfo var workspaces []WorkspaceInfo
for _, wsDir := range wsDirs { for _, wsDir := range wsDirs {
name := filepath.Base(wsDir) name := core.PathBase(wsDir)
// Filter by specific workspace if requested // Filter by specific workspace if requested
if input.Workspace != "" && name != input.Workspace { if input.Workspace != "" && name != input.Workspace {
@ -139,7 +141,7 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
st, err := readStatus(wsDir) st, err := readStatus(wsDir)
if err != nil { if err != nil {
// Legacy workspace (no status.json) — check for log file // Legacy workspace (no status.json) — check for log file
logFiles, _ := filepath.Glob(filepath.Join(wsDir, "agent-*.log")) logFiles := core.PathGlob(core.Path(wsDir, "agent-*.log"))
if len(logFiles) > 0 { if len(logFiles) > 0 {
info.Status = "completed" info.Status = "completed"
} else { } else {
@ -177,10 +179,10 @@ func (s *PrepSubsystem) status(ctx context.Context, _ *mcp.CallToolRequest, inpu
} }
// Process died — check for BLOCKED.md // Process died — check for BLOCKED.md
blockedPath := filepath.Join(wsDir, "src", "BLOCKED.md") blockedPath := core.Path(wsDir, "src", "BLOCKED.md")
if data, err := coreio.Local.Read(blockedPath); err == nil { if data, err := coreio.Local.Read(blockedPath); err == nil {
info.Status = "blocked" info.Status = "blocked"
info.Question = strings.TrimSpace(data) info.Question = core.Trim(data)
st.Status = "blocked" st.Status = "blocked"
st.Question = info.Question st.Question = info.Question
status = "blocked" status = "blocked"

View file

@ -4,11 +4,11 @@ package agentic
import ( import (
"context" "context"
"path/filepath"
"time" "time"
core "dappco.re/go/core"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -69,6 +69,26 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
return nil, WatchOutput{Success: true, Duration: "0s"}, nil return nil, WatchOutput{Success: true, Duration: "0s"}, nil
} }
progressToken := any(nil)
if req != nil && req.Params != nil {
progressToken = req.Params.GetProgressToken()
}
progress := float64(0)
total := float64(len(targets))
sendProgress := func(current float64, status WorkspaceStatus) {
if req == nil || req.Session == nil || progressToken == nil {
return
}
_ = req.Session.NotifyProgress(ctx, &mcp.ProgressNotificationParams{
ProgressToken: progressToken,
Progress: current,
Total: total,
Message: core.Sprintf("%s %s (%s)", status.Repo, status.Status, status.Agent),
})
}
remaining := make(map[string]struct{}, len(targets)) remaining := make(map[string]struct{}, len(targets))
for _, workspace := range targets { for _, workspace := range targets {
remaining[workspace] = struct{}{} remaining[workspace] = struct{}{}
@ -106,6 +126,11 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
switch info.Status { switch info.Status {
case "completed", "merged", "ready-for-review": case "completed", "merged", "ready-for-review":
status := WorkspaceStatus{
Repo: info.Repo,
Agent: info.Agent,
Status: info.Status,
}
completed = append(completed, WatchResult{ completed = append(completed, WatchResult{
Workspace: info.Name, Workspace: info.Name,
Agent: info.Agent, Agent: info.Agent,
@ -116,7 +141,14 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
PRURL: info.PRURL, PRURL: info.PRURL,
}) })
delete(remaining, info.Name) delete(remaining, info.Name)
progress++
sendProgress(progress, status)
case "failed", "blocked": case "failed", "blocked":
status := WorkspaceStatus{
Repo: info.Repo,
Agent: info.Agent,
Status: info.Status,
}
failed = append(failed, WatchResult{ failed = append(failed, WatchResult{
Workspace: info.Name, Workspace: info.Name,
Agent: info.Agent, Agent: info.Agent,
@ -127,6 +159,8 @@ func (s *PrepSubsystem) watch(ctx context.Context, req *mcp.CallToolRequest, inp
PRURL: info.PRURL, PRURL: info.PRURL,
}) })
delete(remaining, info.Name) delete(remaining, info.Name)
progress++
sendProgress(progress, status)
} }
} }
} }
@ -153,15 +187,15 @@ func (s *PrepSubsystem) findActiveWorkspaces() []string {
} }
switch st.Status { switch st.Status {
case "running", "queued": case "running", "queued":
active = append(active, filepath.Base(wsDir)) active = append(active, core.PathBase(wsDir))
} }
} }
return active return active
} }
func (s *PrepSubsystem) resolveWorkspaceDir(name string) string { func (s *PrepSubsystem) resolveWorkspaceDir(name string) string {
if filepath.IsAbs(name) { if core.PathIsAbs(name) {
return name return name
} }
return filepath.Join(s.workspaceRoot(), name) return core.JoinPath(s.workspaceRoot(), name)
} }

View file

@ -4,23 +4,26 @@ package agentic
import ( import (
"os" "os"
"path/filepath"
coreio "forge.lthn.ai/core/go-io" core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
) )
// os.CreateTemp, os.Remove, os.Rename are framework-boundary calls for
// atomic file writes — no core equivalent exists for temp file creation.
// writeAtomic writes content to path by staging it in a temporary file and // writeAtomic writes content to path by staging it in a temporary file and
// renaming it into place. // renaming it into place.
// //
// This avoids exposing partially written workspace files to agents that may // This avoids exposing partially written workspace files to agents that may
// read status, prompt, or plan documents while they are being updated. // read status, prompt, or plan documents while they are being updated.
func writeAtomic(path, content string) error { func writeAtomic(path, content string) error {
dir := filepath.Dir(path) dir := core.PathDir(path)
if err := coreio.Local.EnsureDir(dir); err != nil { if err := coreio.Local.EnsureDir(dir); err != nil {
return err return err
} }
tmp, err := os.CreateTemp(dir, "."+filepath.Base(path)+".*.tmp") tmp, err := os.CreateTemp(dir, "."+core.PathBase(path)+".*.tmp")
if err != nil { if err != nil {
return err return err
} }

400
pkg/mcp/authz.go Normal file
View file

@ -0,0 +1,400 @@
// SPDX-License-Identifier: EUPL-1.2
package mcp
import (
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/subtle"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"time"
core "dappco.re/go/core"
coreerr "dappco.re/go/core/log"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
const (
// authTokenPrefix is the prefix used by HTTP Authorization headers.
authTokenPrefix = "Bearer "
// authDefaultJWTTTL is the default validity duration for minted JWTs.
authDefaultJWTTTL = time.Hour
// authJWTSecretEnv is the HMAC secret used for JWT signing and verification.
authJWTSecretEnv = "MCP_JWT_SECRET"
// authJWTTTLSecondsEnv allows overriding token lifetime.
authJWTTTLSecondsEnv = "MCP_JWT_TTL_SECONDS"
)
// authClaims is the compact claim payload stored inside our internal JWTs.
type authClaims struct {
Workspace string `json:"workspace,omitempty"`
Entitlements []string `json:"entitlements,omitempty"`
Subject string `json:"sub,omitempty"`
Issuer string `json:"iss,omitempty"`
IssuedAt int64 `json:"iat,omitempty"`
ExpiresAt int64 `json:"exp,omitempty"`
}
type authContextKey struct{}
func withAuthClaims(ctx context.Context, claims *authClaims) context.Context {
if ctx == nil {
return context.Background()
}
return context.WithValue(ctx, authContextKey{}, claims)
}
func claimsFromContext(ctx context.Context) *authClaims {
if ctx == nil {
return nil
}
if c := ctx.Value(authContextKey{}); c != nil {
if cl, ok := c.(*authClaims); ok {
return cl
}
}
return nil
}
// authConfig holds token verification options derived from environment.
type authConfig struct {
apiToken string
secret []byte
ttl time.Duration
}
func currentAuthConfig(apiToken string) authConfig {
cfg := authConfig{
apiToken: apiToken,
secret: []byte(core.Env(authJWTSecretEnv)),
ttl: authDefaultJWTTTL,
}
if len(cfg.secret) == 0 {
cfg.secret = []byte(apiToken)
}
if ttlRaw := core.Trim(core.Env(authJWTTTLSecondsEnv)); ttlRaw != "" {
if ttlVal, err := strconv.Atoi(ttlRaw); err == nil && ttlVal > 0 {
cfg.ttl = time.Duration(ttlVal) * time.Second
}
}
return cfg
}
func extractBearerToken(raw string) string {
raw = strings.TrimSpace(raw)
if strings.HasPrefix(raw, authTokenPrefix) {
return strings.TrimSpace(strings.TrimPrefix(raw, authTokenPrefix))
}
return ""
}
func parseAuthClaims(authToken, apiToken string) (*authClaims, error) {
cfg := currentAuthConfig(apiToken)
if cfg.apiToken == "" {
return nil, nil
}
tkn := extractBearerToken(authToken)
if tkn == "" {
return nil, errors.New("missing bearer token")
}
if subtle.ConstantTimeCompare([]byte(tkn), []byte(cfg.apiToken)) == 1 {
return &authClaims{
Subject: "api-key",
IssuedAt: time.Now().Unix(),
}, nil
}
if len(cfg.secret) == 0 {
return nil, errors.New("jwt secret is not configured")
}
parts := strings.Split(tkn, ".")
if len(parts) != 3 {
return nil, errors.New("invalid token format")
}
headerJSON, err := decodeJWTSection(parts[0])
if err != nil {
return nil, err
}
var header map[string]any
if err := json.Unmarshal(headerJSON, &header); err != nil {
return nil, err
}
if alg, _ := header["alg"].(string); alg != "" && alg != "HS256" {
return nil, fmt.Errorf("unsupported jwt algorithm: %s", alg)
}
signatureBase := parts[0] + "." + parts[1]
mac := hmac.New(sha256.New, cfg.secret)
mac.Write([]byte(signatureBase))
expectedSig := mac.Sum(nil)
actualSig, err := decodeJWTSection(parts[2])
if err != nil {
return nil, err
}
if !hmac.Equal(expectedSig, actualSig) {
return nil, errors.New("invalid token signature")
}
payloadJSON, err := decodeJWTSection(parts[1])
if err != nil {
return nil, err
}
var claims authClaims
if err := json.Unmarshal(payloadJSON, &claims); err != nil {
return nil, err
}
now := time.Now().Unix()
if claims.ExpiresAt > 0 && claims.ExpiresAt < now {
return nil, errors.New("token has expired")
}
return &claims, nil
}
func decodeJWTSection(value string) ([]byte, error) {
raw, err := base64.RawURLEncoding.DecodeString(value)
if err != nil {
return nil, err
}
return raw, nil
}
func encodeJWTSection(value []byte) string {
return base64.RawURLEncoding.EncodeToString(value)
}
func mintJWTToken(rawClaims authClaims, cfg authConfig) (string, error) {
now := time.Now().Unix()
if rawClaims.IssuedAt == 0 {
rawClaims.IssuedAt = now
}
if rawClaims.ExpiresAt == 0 {
rawClaims.ExpiresAt = now + int64(cfg.ttl.Seconds())
}
header := map[string]string{
"alg": "HS256",
"typ": "JWT",
}
headerJSON, err := json.Marshal(header)
if err != nil {
return "", err
}
payloadJSON, err := json.Marshal(rawClaims)
if err != nil {
return "", err
}
signingInput := encodeJWTSection(headerJSON) + "." + encodeJWTSection(payloadJSON)
mac := hmac.New(sha256.New, cfg.secret)
mac.Write([]byte(signingInput))
signature := mac.Sum(nil)
return signingInput + "." + encodeJWTSection(signature), nil
}
func authClaimsFromToolRequest(ctx context.Context, req *mcp.CallToolRequest, apiToken string) (claims *authClaims, inTransport bool, err error) {
cfg := currentAuthConfig(apiToken)
if cfg.apiToken == "" {
return nil, false, nil
}
if req != nil {
extra := req.GetExtra()
if extra == nil || extra.Header == nil {
return nil, true, errors.New("missing request auth metadata")
}
raw := extra.Header.Get("Authorization")
parsed, err := parseAuthClaims(raw, apiToken)
if err != nil {
return nil, true, err
}
return parsed, true, nil
}
if claims = claimsFromContext(ctx); claims != nil {
return claims, true, nil
}
return nil, false, nil
}
func (s *Service) authorizeToolAccess(ctx context.Context, req *mcp.CallToolRequest, tool string, input any) error {
apiToken := core.Env("MCP_AUTH_TOKEN")
cfg := currentAuthConfig(apiToken)
if cfg.apiToken == "" {
return nil
}
claims, inTransport, err := authClaimsFromToolRequest(ctx, req, apiToken)
if err != nil {
return coreerr.E("auth", "unauthorized", err)
}
if !inTransport {
// Allow direct service method calls in-process, while still enforcing
// transport requests where auth metadata is present.
return nil
}
if claims == nil {
return coreerr.E("auth", "unauthorized", errors.New("missing auth claims"))
}
if !claims.canRunTool(tool) {
return coreerr.E("auth", "forbidden", errors.New("tool not allowed for token"))
}
if !claims.canAccessWorkspaceFromInput(input) {
return coreerr.E("auth", "forbidden", errors.New("workspace scope mismatch"))
}
return nil
}
func (c *authClaims) canRunTool(tool string) bool {
if c == nil {
return false
}
if len(c.Entitlements) == 0 {
return true
}
toolAllow := "tool:" + tool
for _, e := range c.Entitlements {
switch e {
case "*", "tool:*", "tools:*":
return true
default:
if e == tool {
return true
}
if e == toolAllow || e == "tools:"+tool {
return true
}
}
}
return false
}
func (c *authClaims) canAccessWorkspaceFromInput(input any) bool {
if c == nil || c.Workspace == "" || c.Workspace == "*" {
return true
}
target := inputWorkspaceFromValue(input)
if target == "" {
return true
}
return workspaceMatch(c.Workspace, target)
}
func workspaceMatch(claimed, target string) bool {
if strings.TrimSpace(claimed) == "" {
return true
}
if strings.TrimSpace(target) == "" {
return true
}
if claimed == target {
return true
}
if strings.HasSuffix(claimed, "*") {
prefix := strings.TrimSuffix(claimed, "*")
return strings.HasPrefix(target, prefix)
}
return strings.HasPrefix(target, claimed+"/")
}
func inputWorkspaceFromValue(input any) string {
if input == nil {
return ""
}
v := reflect.ValueOf(input)
for v.Kind() == reflect.Pointer && !v.IsNil() {
v = v.Elem()
}
if !v.IsValid() {
return ""
}
switch v.Kind() {
case reflect.Map:
return workspaceFromMap(v)
case reflect.Struct:
return workspaceFromStruct(v)
default:
return ""
}
}
func workspaceFromMap(v reflect.Value) string {
if v.IsNil() {
return ""
}
keyType := v.Type().Key()
if keyType.Kind() != reflect.String {
return ""
}
for _, key := range []string{
"workspace",
"repo",
"repository",
"project",
"workspace_id",
} {
mapKey := reflect.ValueOf(key)
if mapKey.Type() != keyType {
if mapKey.Type().ConvertibleTo(keyType) {
mapKey = mapKey.Convert(keyType)
} else {
continue
}
}
if mapKey.IsValid() {
raw := v.MapIndex(mapKey)
if raw.IsValid() && raw.Kind() == reflect.String {
return strings.TrimSpace(raw.String())
}
}
}
return ""
}
func workspaceFromStruct(v reflect.Value) string {
t := v.Type()
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
ft := t.Field(i)
if !f.CanInterface() {
continue
}
keys := []string{strings.ToLower(ft.Name)}
if tag := ft.Tag.Get("json"); tag != "" {
keys = append(keys, strings.ToLower(strings.Split(tag, ",")[0]))
}
for _, candidate := range keys {
if candidate != "workspace" && candidate != "repo" && candidate != "repository" {
continue
}
switch f.Kind() {
case reflect.String:
if s := strings.TrimSpace(f.String()); s != "" {
return s
}
case reflect.Pointer:
if f.IsNil() {
continue
}
if f.Elem().Kind() == reflect.String {
if s := strings.TrimSpace(f.Elem().String()); s != "" {
return s
}
}
}
}
}
return ""
}

View file

@ -9,7 +9,7 @@ import (
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
"dappco.re/go/mcp/pkg/mcp/ide" "dappco.re/go/mcp/pkg/mcp/ide"
coreerr "forge.lthn.ai/core/go-log" coreerr "dappco.re/go/core/log"
) )
// errBridgeNotAvailable is returned when a tool requires the Laravel bridge // errBridgeNotAvailable is returned when a tool requires the Laravel bridge

View file

@ -3,20 +3,15 @@
package brain package brain
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt"
goio "io"
"net/http" "net/http"
"net/url" "net/url"
"os"
"strings"
"time" "time"
core "dappco.re/go/core"
coreio "dappco.re/go/core/io"
coreerr "dappco.re/go/core/log"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreio "forge.lthn.ai/core/go-io"
coreerr "forge.lthn.ai/core/go-log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -58,15 +53,16 @@ func (s *DirectSubsystem) OnChannel(fn func(ctx context.Context, channel string,
// Reads CORE_BRAIN_URL and CORE_BRAIN_KEY from environment, or falls back // Reads CORE_BRAIN_URL and CORE_BRAIN_KEY from environment, or falls back
// to ~/.claude/brain.key for the API key. // to ~/.claude/brain.key for the API key.
func NewDirect() *DirectSubsystem { func NewDirect() *DirectSubsystem {
apiURL := os.Getenv("CORE_BRAIN_URL") apiURL := core.Env("CORE_BRAIN_URL")
if apiURL == "" { if apiURL == "" {
apiURL = "https://api.lthn.sh" apiURL = "https://api.lthn.sh"
} }
apiKey := os.Getenv("CORE_BRAIN_KEY") apiKey := core.Env("CORE_BRAIN_KEY")
if apiKey == "" { if apiKey == "" {
if data, err := coreio.Local.Read(os.ExpandEnv("$HOME/.claude/brain.key")); err == nil { home := core.Env("HOME")
apiKey = strings.TrimSpace(data) if data, err := coreio.Local.Read(core.Path(home, ".claude", "brain.key")); err == nil {
apiKey = core.Trim(data)
} }
} }
@ -112,16 +108,12 @@ func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body
return nil, coreerr.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil) return nil, coreerr.E("brain.apiCall", "no API key (set CORE_BRAIN_KEY or create ~/.claude/brain.key)", nil)
} }
var reqBody goio.Reader var bodyStr string
if body != nil { if body != nil {
data, err := json.Marshal(body) bodyStr = core.JSONMarshalString(body)
if err != nil {
return nil, coreerr.E("brain.apiCall", "marshal request", err)
}
reqBody = bytes.NewReader(data)
} }
req, err := http.NewRequestWithContext(ctx, method, s.apiURL+path, reqBody) req, err := http.NewRequestWithContext(ctx, method, s.apiURL+path, core.NewReader(bodyStr))
if err != nil { if err != nil {
return nil, coreerr.E("brain.apiCall", "create request", err) return nil, coreerr.E("brain.apiCall", "create request", err)
} }
@ -135,18 +127,22 @@ func (s *DirectSubsystem) apiCall(ctx context.Context, method, path string, body
} }
defer resp.Body.Close() defer resp.Body.Close()
respData, err := goio.ReadAll(resp.Body) r := core.ReadAll(resp.Body)
if err != nil { if !r.OK {
return nil, coreerr.E("brain.apiCall", "read response", err) if readErr, ok := r.Value.(error); ok {
return nil, coreerr.E("brain.apiCall", "read response", readErr)
} }
return nil, coreerr.E("brain.apiCall", "read response failed", nil)
}
respData := r.Value.(string)
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
return nil, coreerr.E("brain.apiCall", "API returned "+string(respData), nil) return nil, coreerr.E("brain.apiCall", "API returned "+respData, nil)
} }
var result map[string]any var result map[string]any
if err := json.Unmarshal(respData, &result); err != nil { if ur := core.JSONUnmarshal([]byte(respData), &result); !ur.OK {
return nil, coreerr.E("brain.apiCall", "parse response", err) return nil, coreerr.E("brain.apiCall", "parse response", nil)
} }
return result, nil return result, nil
@ -200,30 +196,7 @@ func (s *DirectSubsystem) recall(ctx context.Context, _ *mcp.CallToolRequest, in
return nil, RecallOutput{}, err return nil, RecallOutput{}, err
} }
var memories []Memory memories := memoriesFromResult(result)
if mems, ok := result["memories"].([]any); ok {
for _, m := range mems {
if mm, ok := m.(map[string]any); ok {
mem := Memory{
Content: fmt.Sprintf("%v", mm["content"]),
Type: fmt.Sprintf("%v", mm["type"]),
Project: fmt.Sprintf("%v", mm["project"]),
AgentID: fmt.Sprintf("%v", mm["agent_id"]),
CreatedAt: fmt.Sprintf("%v", mm["created_at"]),
}
if id, ok := mm["id"].(string); ok {
mem.ID = id
}
if score, ok := mm["score"].(float64); ok {
mem.Confidence = score
}
if source, ok := mm["source"].(string); ok {
mem.Tags = append(mem.Tags, "source:"+source)
}
memories = append(memories, mem)
}
}
}
if s.onChannel != nil { if s.onChannel != nil {
s.onChannel(ctx, coremcp.ChannelBrainRecallDone, map[string]any{ s.onChannel(ctx, coremcp.ChannelBrainRecallDone, map[string]any{
@ -274,37 +247,14 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu
if input.AgentID != "" { if input.AgentID != "" {
values.Set("agent_id", input.AgentID) values.Set("agent_id", input.AgentID)
} }
values.Set("limit", fmt.Sprintf("%d", limit)) values.Set("limit", core.Sprintf("%d", limit))
result, err := s.apiCall(ctx, http.MethodGet, "/v1/brain/list?"+values.Encode(), nil) result, err := s.apiCall(ctx, http.MethodGet, "/v1/brain/list?"+values.Encode(), nil)
if err != nil { if err != nil {
return nil, ListOutput{}, err return nil, ListOutput{}, err
} }
var memories []Memory memories := memoriesFromResult(result)
if mems, ok := result["memories"].([]any); ok {
for _, m := range mems {
if mm, ok := m.(map[string]any); ok {
mem := Memory{
Content: fmt.Sprintf("%v", mm["content"]),
Type: fmt.Sprintf("%v", mm["type"]),
Project: fmt.Sprintf("%v", mm["project"]),
AgentID: fmt.Sprintf("%v", mm["agent_id"]),
CreatedAt: fmt.Sprintf("%v", mm["created_at"]),
}
if id, ok := mm["id"].(string); ok {
mem.ID = id
}
if score, ok := mm["score"].(float64); ok {
mem.Confidence = score
}
if source, ok := mm["source"].(string); ok {
mem.Tags = append(mem.Tags, "source:"+source)
}
memories = append(memories, mem)
}
}
}
if s.onChannel != nil { if s.onChannel != nil {
s.onChannel(ctx, coremcp.ChannelBrainListDone, map[string]any{ s.onChannel(ctx, coremcp.ChannelBrainListDone, map[string]any{
@ -321,3 +271,49 @@ func (s *DirectSubsystem) list(ctx context.Context, _ *mcp.CallToolRequest, inpu
Memories: memories, Memories: memories,
}, nil }, nil
} }
// memoriesFromResult extracts Memory entries from an API response map.
func memoriesFromResult(result map[string]any) []Memory {
var memories []Memory
mems, ok := result["memories"].([]any)
if !ok {
return memories
}
for _, m := range mems {
mm, ok := m.(map[string]any)
if !ok {
continue
}
mem := Memory{
Content: stringFromMap(mm, "content"),
Type: stringFromMap(mm, "type"),
Project: stringFromMap(mm, "project"),
AgentID: stringFromMap(mm, "agent_id"),
CreatedAt: stringFromMap(mm, "created_at"),
}
if id, ok := mm["id"].(string); ok {
mem.ID = id
}
if score, ok := mm["score"].(float64); ok {
mem.Confidence = score
}
if source, ok := mm["source"].(string); ok {
mem.Tags = append(mem.Tags, "source:"+source)
}
memories = append(memories, mem)
}
return memories
}
// stringFromMap extracts a string value from a map, returning "" if missing or wrong type.
func stringFromMap(m map[string]any, key string) string {
v, ok := m[key]
if !ok || v == nil {
return ""
}
s, ok := v.(string)
if !ok {
return core.Sprintf("%v", v)
}
return s
}

View file

@ -7,9 +7,9 @@ import (
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
"dappco.re/go/mcp/pkg/mcp/ide" "dappco.re/go/mcp/pkg/mcp/ide"
"forge.lthn.ai/core/api" "dappco.re/go/core/api"
"forge.lthn.ai/core/api/pkg/provider" "dappco.re/go/core/api/pkg/provider"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )

View file

@ -8,7 +8,7 @@ import (
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
"dappco.re/go/mcp/pkg/mcp/ide" "dappco.re/go/mcp/pkg/mcp/ide"
coreerr "forge.lthn.ai/core/go-log" coreerr "dappco.re/go/core/log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )

View file

@ -3,13 +3,12 @@
package mcp package mcp
import ( import (
"errors"
"net/http" "net/http"
core "dappco.re/go/core" core "dappco.re/go/core"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
api "forge.lthn.ai/core/api" api "dappco.re/go/core/api"
) )
// maxBodySize is the maximum request body size accepted by bridged tool endpoints. // maxBodySize is the maximum request body size accepted by bridged tool endpoints.
@ -48,7 +47,7 @@ func BridgeToAPI(svc *Service, bridge *api.ToolBridge) {
if !r.OK { if !r.OK {
if err, ok := r.Value.(error); ok { if err, ok := r.Value.(error); ok {
var maxBytesErr *http.MaxBytesError var maxBytesErr *http.MaxBytesError
if errors.As(err, &maxBytesErr) || core.Contains(err.Error(), "request body too large") { if core.As(err, &maxBytesErr) || core.Contains(err.Error(), "request body too large") {
c.JSON(http.StatusRequestEntityTooLarge, api.Fail("request_too_large", "Request body exceeds 10 MB limit")) c.JSON(http.StatusRequestEntityTooLarge, api.Fail("request_too_large", "Request body exceeds 10 MB limit"))
return return
} }
@ -63,7 +62,7 @@ func BridgeToAPI(svc *Service, bridge *api.ToolBridge) {
if err != nil { if err != nil {
// Body present + error = likely bad input (malformed JSON). // Body present + error = likely bad input (malformed JSON).
// No body + error = tool execution failure. // No body + error = tool execution failure.
if errors.Is(err, errInvalidRESTInput) { if core.Is(err, errInvalidRESTInput) {
c.JSON(http.StatusBadRequest, api.Fail("invalid_input", "Malformed JSON in request body")) c.JSON(http.StatusBadRequest, api.Fail("invalid_input", "Malformed JSON in request body"))
return return
} }

View file

@ -17,7 +17,7 @@ import (
"dappco.re/go/mcp/pkg/mcp/agentic" "dappco.re/go/mcp/pkg/mcp/agentic"
"dappco.re/go/mcp/pkg/mcp/brain" "dappco.re/go/mcp/pkg/mcp/brain"
"dappco.re/go/mcp/pkg/mcp/ide" "dappco.re/go/mcp/pkg/mcp/ide"
api "forge.lthn.ai/core/api" api "dappco.re/go/core/api"
) )
func init() { func init() {
@ -81,13 +81,16 @@ func TestBridgeToAPI_Good_DescribableGroup(t *testing.T) {
var dg api.DescribableGroup = bridge var dg api.DescribableGroup = bridge
descs := dg.Describe() descs := dg.Describe()
if len(descs) != len(svc.Tools()) { // ToolBridge.Describe prepends a GET entry describing the tool listing
t.Fatalf("expected %d descriptions, got %d", len(svc.Tools()), len(descs)) // endpoint, so the expected count is svc.Tools() + 1.
wantDescs := len(svc.Tools()) + 1
if len(descs) != wantDescs {
t.Fatalf("expected %d descriptions, got %d", wantDescs, len(descs))
} }
for _, d := range descs { for _, d := range descs {
if d.Method != "POST" { if d.Method != "POST" && d.Method != "GET" {
t.Errorf("expected Method=POST for %s, got %q", d.Path, d.Method) t.Errorf("expected Method=POST or GET for %s, got %q", d.Path, d.Method)
} }
if d.Summary == "" { if d.Summary == "" {
t.Errorf("expected non-empty Summary for %s", d.Path) t.Errorf("expected non-empty Summary for %s", d.Path)
@ -250,7 +253,7 @@ func TestBridgeToAPI_Good_EndToEnd(t *testing.T) {
} }
// Verify a tool endpoint is reachable through the engine. // Verify a tool endpoint is reachable through the engine.
resp2, err := http.Post(srv.URL+"/tools/lang_list", "application/json", nil) resp2, err := http.Post(srv.URL+"/tools/lang_list", "application/json", strings.NewReader("{}"))
if err != nil { if err != nil {
t.Fatalf("lang_list request failed: %v", err) t.Fatalf("lang_list request failed: %v", err)
} }

View file

@ -9,8 +9,8 @@ import (
"sync" "sync"
"time" "time"
coreerr "forge.lthn.ai/core/go-log" coreerr "dappco.re/go/core/log"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )

View file

@ -11,7 +11,7 @@ import (
"testing" "testing"
"time" "time"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )

View file

@ -4,14 +4,13 @@ package ide
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"time" "time"
core "dappco.re/go/core" core "dappco.re/go/core"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreerr "forge.lthn.ai/core/go-log" coreerr "dappco.re/go/core/log"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
) )
// errBridgeNotAvailable is returned when a tool requires the Laravel bridge // errBridgeNotAvailable is returned when a tool requires the Laravel bridge
@ -556,7 +555,7 @@ func stringFromAny(v any) string {
switch value := v.(type) { switch value := v.(type) {
case string: case string:
return value return value
case fmt.Stringer: case interface{ String() string }:
return value.String() return value.String()
default: default:
return "" return ""

View file

@ -7,7 +7,7 @@ import (
"time" "time"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
coreerr "forge.lthn.ai/core/go-log" coreerr "dappco.re/go/core/log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )

View file

@ -4,6 +4,7 @@ package ide
import ( import (
"context" "context"
"sync"
"time" "time"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
@ -86,6 +87,46 @@ type DashboardMetricsOutput struct {
Metrics DashboardMetrics `json:"metrics"` Metrics DashboardMetrics `json:"metrics"`
} }
// DashboardStateInput is the input for ide_dashboard_state.
//
// input := DashboardStateInput{}
type DashboardStateInput struct{}
// DashboardStateOutput is the output for ide_dashboard_state.
//
// // out.State["theme"] == "dark"
type DashboardStateOutput struct {
State map[string]any `json:"state"` // arbitrary key/value map
UpdatedAt time.Time `json:"updatedAt"` // when the state last changed
}
// DashboardUpdateInput is the input for ide_dashboard_update.
//
// input := DashboardUpdateInput{
// State: map[string]any{"theme": "light", "sidebar": true},
// Replace: false,
// }
type DashboardUpdateInput struct {
State map[string]any `json:"state"` // partial or full state
Replace bool `json:"replace,omitempty"` // true to overwrite, false to merge (default)
}
// DashboardUpdateOutput is the output for ide_dashboard_update.
//
// // out.State reflects the merged/replaced state
type DashboardUpdateOutput struct {
State map[string]any `json:"state"` // merged state after the update
UpdatedAt time.Time `json:"updatedAt"` // when the state was applied
}
// dashboardStateStore holds the mutable dashboard UI state shared between the
// IDE frontend and MCP callers. Access is guarded by dashboardStateMu.
var (
dashboardStateMu sync.RWMutex
dashboardStateStore = map[string]any{}
dashboardStateUpdated time.Time
)
func (s *Subsystem) registerDashboardTools(svc *coremcp.Service) { func (s *Subsystem) registerDashboardTools(svc *coremcp.Service) {
server := svc.Server() server := svc.Server()
coremcp.AddToolRecorded(svc, server, "ide", &mcp.Tool{ coremcp.AddToolRecorded(svc, server, "ide", &mcp.Tool{
@ -102,6 +143,16 @@ func (s *Subsystem) registerDashboardTools(svc *coremcp.Service) {
Name: "ide_dashboard_metrics", Name: "ide_dashboard_metrics",
Description: "Get aggregate build and agent metrics for a time period", Description: "Get aggregate build and agent metrics for a time period",
}, s.dashboardMetrics) }, s.dashboardMetrics)
coremcp.AddToolRecorded(svc, server, "ide", &mcp.Tool{
Name: "ide_dashboard_state",
Description: "Get the current dashboard UI state (arbitrary key/value map shared with the IDE).",
}, s.dashboardState)
coremcp.AddToolRecorded(svc, server, "ide", &mcp.Tool{
Name: "ide_dashboard_update",
Description: "Update the dashboard UI state. Merges into existing state by default; set replace=true to overwrite.",
}, s.dashboardUpdate)
} }
// dashboardOverview returns a platform overview with bridge status and // dashboardOverview returns a platform overview with bridge status and
@ -211,3 +262,79 @@ func (s *Subsystem) dashboardMetrics(_ context.Context, _ *mcp.CallToolRequest,
}, },
}, nil }, nil
} }
// dashboardState returns the current dashboard UI state as a snapshot.
//
// out := s.dashboardState(ctx, nil, DashboardStateInput{})
func (s *Subsystem) dashboardState(_ context.Context, _ *mcp.CallToolRequest, _ DashboardStateInput) (*mcp.CallToolResult, DashboardStateOutput, error) {
dashboardStateMu.RLock()
defer dashboardStateMu.RUnlock()
snapshot := make(map[string]any, len(dashboardStateStore))
for k, v := range dashboardStateStore {
snapshot[k] = v
}
return nil, DashboardStateOutput{
State: snapshot,
UpdatedAt: dashboardStateUpdated,
}, nil
}
// dashboardUpdate merges or replaces the dashboard UI state and emits an
// activity event so the IDE can react to the change.
//
// out := s.dashboardUpdate(ctx, nil, DashboardUpdateInput{State: map[string]any{"theme": "dark"}})
func (s *Subsystem) dashboardUpdate(ctx context.Context, _ *mcp.CallToolRequest, input DashboardUpdateInput) (*mcp.CallToolResult, DashboardUpdateOutput, error) {
now := time.Now()
dashboardStateMu.Lock()
if input.Replace || dashboardStateStore == nil {
dashboardStateStore = make(map[string]any, len(input.State))
}
for k, v := range input.State {
dashboardStateStore[k] = v
}
dashboardStateUpdated = now
snapshot := make(map[string]any, len(dashboardStateStore))
for k, v := range dashboardStateStore {
snapshot[k] = v
}
dashboardStateMu.Unlock()
// Record the change on the activity feed so ide_dashboard_activity
// reflects state transitions alongside build/session events.
s.recordActivity("dashboard_state", "dashboard state updated")
// Push the update over the Laravel bridge when available so web clients
// stay in sync with desktop tooling.
if s.bridge != nil {
_ = s.bridge.Send(BridgeMessage{
Type: "dashboard_update",
Data: snapshot,
})
}
// Surface the change on the shared MCP notifier so connected sessions
// receive a JSON-RPC notification alongside the tool response.
if s.notifier != nil {
s.notifier.ChannelSend(ctx, "dashboard.state.updated", map[string]any{
"state": snapshot,
"updatedAt": now,
})
}
return nil, DashboardUpdateOutput{
State: snapshot,
UpdatedAt: now,
}, nil
}
// resetDashboardState clears the shared dashboard state. Intended for tests.
func resetDashboardState() {
dashboardStateMu.Lock()
defer dashboardStateMu.Unlock()
dashboardStateStore = map[string]any{}
dashboardStateUpdated = time.Time{}
}

View file

@ -9,7 +9,7 @@ import (
"time" "time"
coremcp "dappco.re/go/mcp/pkg/mcp" coremcp "dappco.re/go/mcp/pkg/mcp"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
) )
// --- Helpers --- // --- Helpers ---
@ -949,3 +949,76 @@ func TestChatSend_Good_BridgeMessageType(t *testing.T) {
t.Fatal("timed out waiting for bridge message") t.Fatal("timed out waiting for bridge message")
} }
} }
// TestToolsDashboard_DashboardState_Good returns an empty state when the
// store has not been touched.
func TestToolsDashboard_DashboardState_Good(t *testing.T) {
t.Cleanup(resetDashboardState)
sub := newNilBridgeSubsystem()
_, out, err := sub.dashboardState(context.Background(), nil, DashboardStateInput{})
if err != nil {
t.Fatalf("dashboardState failed: %v", err)
}
if len(out.State) != 0 {
t.Fatalf("expected empty state, got %v", out.State)
}
}
// TestToolsDashboard_DashboardUpdate_Good merges the supplied state into the
// shared store and reflects it back on a subsequent dashboardState call.
func TestToolsDashboard_DashboardUpdate_Good(t *testing.T) {
t.Cleanup(resetDashboardState)
sub := newNilBridgeSubsystem()
_, updateOut, err := sub.dashboardUpdate(context.Background(), nil, DashboardUpdateInput{
State: map[string]any{"theme": "dark"},
})
if err != nil {
t.Fatalf("dashboardUpdate failed: %v", err)
}
if updateOut.State["theme"] != "dark" {
t.Fatalf("expected theme 'dark', got %v", updateOut.State["theme"])
}
_, readOut, err := sub.dashboardState(context.Background(), nil, DashboardStateInput{})
if err != nil {
t.Fatalf("dashboardState failed: %v", err)
}
if readOut.State["theme"] != "dark" {
t.Fatalf("expected persisted theme 'dark', got %v", readOut.State["theme"])
}
if readOut.UpdatedAt.IsZero() {
t.Fatal("expected non-zero UpdatedAt after update")
}
}
// TestToolsDashboard_DashboardUpdate_Ugly replaces (not merges) prior state
// when Replace=true.
func TestToolsDashboard_DashboardUpdate_Ugly(t *testing.T) {
t.Cleanup(resetDashboardState)
sub := newNilBridgeSubsystem()
_, _, err := sub.dashboardUpdate(context.Background(), nil, DashboardUpdateInput{
State: map[string]any{"theme": "dark", "sidebar": true},
})
if err != nil {
t.Fatalf("seed dashboardUpdate failed: %v", err)
}
_, out, err := sub.dashboardUpdate(context.Background(), nil, DashboardUpdateInput{
State: map[string]any{"theme": "light"},
Replace: true,
})
if err != nil {
t.Fatalf("replace dashboardUpdate failed: %v", err)
}
if _, ok := out.State["sidebar"]; ok {
t.Fatal("expected sidebar to be removed after replace")
}
if out.State["theme"] != "light" {
t.Fatalf("expected theme 'light', got %v", out.State["theme"])
}
}

View file

@ -5,22 +5,20 @@
package mcp package mcp
import ( import (
"cmp"
"context" "context"
"errors"
"iter" "iter"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"slices" "slices"
"sort"
"strings"
"sync" "sync"
core "dappco.re/go/core" core "dappco.re/go/core"
"forge.lthn.ai/core/go-io" "dappco.re/go/core/io"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
"forge.lthn.ai/core/go-process" "dappco.re/go/core/process"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -74,7 +72,8 @@ func New(opts Options) (*Service, error) {
server := mcp.NewServer(impl, &mcp.ServerOptions{ server := mcp.NewServer(impl, &mcp.ServerOptions{
Capabilities: &mcp.ServerCapabilities{ Capabilities: &mcp.ServerCapabilities{
Tools: &mcp.ToolCapabilities{ListChanged: true}, Resources: &mcp.ResourceCapabilities{ListChanged: false},
Tools: &mcp.ToolCapabilities{ListChanged: false},
Logging: &mcp.LoggingCapabilities{}, Logging: &mcp.LoggingCapabilities{},
Experimental: channelCapability(), Experimental: channelCapability(),
}, },
@ -245,15 +244,15 @@ func (s *Service) resolveWorkspacePath(path string) string {
} }
if s.workspaceRoot == "" { if s.workspaceRoot == "" {
return filepath.Clean(path) return core.CleanPath(path, "/")
} }
clean := filepath.Clean(string(filepath.Separator) + path) clean := core.CleanPath(string(filepath.Separator)+path, "/")
clean = strings.TrimPrefix(clean, string(filepath.Separator)) clean = core.TrimPrefix(clean, string(filepath.Separator))
if clean == "." || clean == "" { if clean == "." || clean == "" {
return s.workspaceRoot return s.workspaceRoot
} }
return filepath.Join(s.workspaceRoot, clean) return core.Path(s.workspaceRoot, clean)
} }
// registerTools adds the built-in tool groups to the MCP server. // registerTools adds the built-in tool groups to the MCP server.
@ -317,6 +316,7 @@ func (s *Service) registerTools(server *mcp.Server) {
s.registerProcessTools(server) s.registerProcessTools(server)
s.registerWebviewTools(server) s.registerWebviewTools(server)
s.registerWSTools(server) s.registerWSTools(server)
s.registerWSClientTools(server)
} }
// Tool input/output types for MCP file operations. // Tool input/output types for MCP file operations.
@ -543,8 +543,8 @@ func (s *Service) listDirectory(ctx context.Context, req *mcp.CallToolRequest, i
if err != nil { if err != nil {
return nil, ListDirectoryOutput{}, log.E("mcp.listDirectory", "failed to list directory", err) return nil, ListDirectoryOutput{}, log.E("mcp.listDirectory", "failed to list directory", err)
} }
sort.Slice(entries, func(i, j int) bool { slices.SortFunc(entries, func(a, b os.DirEntry) int {
return entries[i].Name() < entries[j].Name() return cmp.Compare(a.Name(), b.Name())
}) })
result := make([]DirectoryEntry, 0, len(entries)) result := make([]DirectoryEntry, 0, len(entries))
for _, e := range entries { for _, e := range entries {
@ -615,7 +615,7 @@ func (s *Service) fileExists(ctx context.Context, req *mcp.CallToolRequest, inpu
info, err := s.medium.Stat(input.Path) info, err := s.medium.Stat(input.Path)
if err != nil { if err != nil {
if errors.Is(err, os.ErrNotExist) { if core.Is(err, os.ErrNotExist) {
return nil, FileExistsOutput{Exists: false, IsDir: false, Path: input.Path}, nil return nil, FileExistsOutput{Exists: false, IsDir: false, Path: input.Path}, nil
} }
return nil, FileExistsOutput{}, log.E("mcp.fileExists", "failed to stat path", err) return nil, FileExistsOutput{}, log.E("mcp.fileExists", "failed to stat path", err)

View file

@ -7,17 +7,17 @@
package mcp package mcp
import ( import (
"cmp"
"context" "context"
"io" "io"
"iter" "iter"
"os" "os"
"reflect" "reflect"
"slices" "slices"
"sort"
"strings"
"sync" "sync"
"unsafe" "unsafe"
core "dappco.re/go/core"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -203,7 +203,7 @@ func (s *Service) ChannelSend(ctx context.Context, channel string, data any) {
if s == nil || s.server == nil { if s == nil || s.server == nil {
return return
} }
if strings.TrimSpace(channel) == "" { if core.Trim(channel) == "" {
return return
} }
ctx = normalizeNotificationContext(ctx) ctx = normalizeNotificationContext(ctx)
@ -218,7 +218,7 @@ func (s *Service) ChannelSendToSession(ctx context.Context, session *mcp.ServerS
if s == nil || s.server == nil || session == nil { if s == nil || s.server == nil || session == nil {
return return
} }
if strings.TrimSpace(channel) == "" { if core.Trim(channel) == "" {
return return
} }
ctx = normalizeNotificationContext(ctx) ctx = normalizeNotificationContext(ctx)
@ -275,6 +275,15 @@ func (s *Service) debugNotify(msg string, args ...any) {
s.logger.Debug(msg, args...) s.logger.Debug(msg, args...)
} }
// NotifySession sends a raw JSON-RPC notification to a specific MCP session.
//
// coremcp.NotifySession(ctx, session, "notifications/claude/channel", map[string]any{
// "content": "build failed", "meta": map[string]string{"severity": "high"},
// })
func NotifySession(ctx context.Context, session *mcp.ServerSession, method string, payload any) error {
return sendSessionNotification(ctx, session, method, payload)
}
func sendSessionNotification(ctx context.Context, session *mcp.ServerSession, method string, payload any) error { func sendSessionNotification(ctx context.Context, session *mcp.ServerSession, method string, payload any) error {
if session == nil { if session == nil {
return nil return nil
@ -353,8 +362,8 @@ func snapshotSessions(server *mcp.Server) []*mcp.ServerSession {
} }
} }
sort.Slice(sessions, func(i, j int) bool { slices.SortFunc(sessions, func(a, b *mcp.ServerSession) int {
return sessions[i].ID() < sessions[j].ID() return cmp.Compare(a.ID(), b.ID())
}) })
return sessions return sessions

View file

@ -4,9 +4,9 @@ package mcp
import ( import (
"context" "context"
"path/filepath"
"strings"
"time" "time"
core "dappco.re/go/core"
) )
type processRuntime struct { type processRuntime struct {
@ -50,19 +50,20 @@ func (s *Service) forgetProcessRuntime(id string) {
} }
func isTestProcess(command string, args []string) bool { func isTestProcess(command string, args []string) bool {
base := strings.ToLower(filepath.Base(command)) base := core.Lower(core.PathBase(command))
if base == "" { if base == "" {
return false return false
} }
switch base { switch base {
case "go": case "go":
return len(args) > 0 && strings.EqualFold(args[0], "test") return len(args) > 0 && core.Lower(args[0]) == "test"
case "cargo": case "cargo":
return len(args) > 0 && strings.EqualFold(args[0], "test") return len(args) > 0 && core.Lower(args[0]) == "test"
case "npm", "pnpm", "yarn", "bun": case "npm", "pnpm", "yarn", "bun":
for _, arg := range args { for _, arg := range args {
if strings.EqualFold(arg, "test") || strings.HasPrefix(strings.ToLower(arg), "test:") { lower := core.Lower(arg)
if lower == "test" || core.HasPrefix(lower, "test:") {
return true return true
} }
} }

View file

@ -7,8 +7,8 @@ import (
"time" "time"
core "dappco.re/go/core" core "dappco.re/go/core"
"forge.lthn.ai/core/go-process" "dappco.re/go/core/process"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
) )
// Register is the service factory for core.WithService. // Register is the service factory for core.WithService.

View file

@ -9,8 +9,8 @@ import (
"time" "time"
"dappco.re/go/core" "dappco.re/go/core"
"forge.lthn.ai/core/go-process" "dappco.re/go/core/process"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
) )
func TestRegister_Good_WiresOptionalServices(t *testing.T) { func TestRegister_Good_WiresOptionalServices(t *testing.T) {

View file

@ -78,7 +78,43 @@ type ToolRecord struct {
// return nil, ReadFileOutput{Path: "src/main.go"}, nil // return nil, ReadFileOutput{Path: "src/main.go"}, nil
// }) // })
func AddToolRecorded[In, Out any](s *Service, server *mcp.Server, group string, t *mcp.Tool, h mcp.ToolHandlerFor[In, Out]) { func AddToolRecorded[In, Out any](s *Service, server *mcp.Server, group string, t *mcp.Tool, h mcp.ToolHandlerFor[In, Out]) {
mcp.AddTool(server, t, h) // Set inputSchema from struct reflection if not already set.
// Use server.AddTool (non-generic) to avoid auto-generated outputSchema.
// The go-sdk's generic mcp.AddTool generates outputSchema from the Out type,
// but Claude Code's protocol (2025-03-26) doesn't support outputSchema.
// Removing it reduces tools/list from 214KB to ~74KB.
if t.InputSchema == nil {
t.InputSchema = structSchema(new(In))
if t.InputSchema == nil {
t.InputSchema = map[string]any{"type": "object"}
}
}
// Wrap the typed handler into a generic ToolHandler.
wrapped := func(ctx context.Context, req *mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var input In
if req != nil && len(req.Params.Arguments) > 0 {
if r := core.JSONUnmarshal(req.Params.Arguments, &input); !r.OK {
if err, ok := r.Value.(error); ok {
return nil, err
}
}
}
if err := s.authorizeToolAccess(ctx, req, t.Name, input); err != nil {
return nil, err
}
result, output, err := h(ctx, req, input)
if err != nil {
return nil, err
}
if result != nil {
return result, nil
}
data := core.JSONMarshalString(output)
return &mcp.CallToolResult{
Content: []mcp.Content{&mcp.TextContent{Text: data}},
}, nil
}
server.AddTool(t, wrapped)
restHandler := func(ctx context.Context, body []byte) (any, error) { restHandler := func(ctx context.Context, body []byte) (any, error) {
var input In var input In

View file

@ -7,7 +7,7 @@ import (
"errors" "errors"
"testing" "testing"
"forge.lthn.ai/core/go-process" "dappco.re/go/core/process"
) )
func TestToolRegistry_Good_RecordsTools(t *testing.T) { func TestToolRegistry_Good_RecordsTools(t *testing.T) {
@ -71,13 +71,19 @@ func TestToolRegistry_Good_ToolCount(t *testing.T) {
} }
tools := svc.Tools() tools := svc.Tools()
// Built-in tools: file_read, file_write, file_delete, file_rename, // Built-in tools (no ProcessService / WSHub / Subsystems):
// file_exists, file_edit, dir_list, dir_create, lang_detect, lang_list, // files (8): file_read, file_write, file_delete, file_rename,
// metrics_record, metrics_query, rag_query, rag_ingest, rag_collections, // file_exists, file_edit, dir_list, dir_create
// webview_connect, webview_disconnect, webview_navigate, webview_click, // language (2): lang_detect, lang_list
// webview_type, webview_query, webview_console, webview_eval, // metrics (2): metrics_record, metrics_query
// webview_screenshot, webview_wait // rag (6): rag_query, rag_search, rag_ingest, rag_index,
const expectedCount = 25 // rag_retrieve, rag_collections
// webview (12): webview_connect, webview_disconnect, webview_navigate,
// webview_click, webview_type, webview_query,
// webview_console, webview_eval, webview_screenshot,
// webview_wait, webview_render, webview_update
// ws (3): ws_connect, ws_send, ws_close
const expectedCount = 33
if len(tools) != expectedCount { if len(tools) != expectedCount {
t.Errorf("expected %d tools, got %d", expectedCount, len(tools)) t.Errorf("expected %d tools, got %d", expectedCount, len(tools))
for _, tr := range tools { for _, tr := range tools {
@ -95,8 +101,8 @@ func TestToolRegistry_Good_GroupAssignment(t *testing.T) {
fileTools := []string{"file_read", "file_write", "file_delete", "file_rename", "file_exists", "file_edit", "dir_list", "dir_create"} fileTools := []string{"file_read", "file_write", "file_delete", "file_rename", "file_exists", "file_edit", "dir_list", "dir_create"}
langTools := []string{"lang_detect", "lang_list"} langTools := []string{"lang_detect", "lang_list"}
metricsTools := []string{"metrics_record", "metrics_query"} metricsTools := []string{"metrics_record", "metrics_query"}
ragTools := []string{"rag_query", "rag_ingest", "rag_collections"} ragTools := []string{"rag_query", "rag_search", "rag_ingest", "rag_index", "rag_retrieve", "rag_collections"}
webviewTools := []string{"webview_connect", "webview_disconnect", "webview_navigate", "webview_click", "webview_type", "webview_query", "webview_console", "webview_eval", "webview_screenshot", "webview_wait"} webviewTools := []string{"webview_connect", "webview_disconnect", "webview_navigate", "webview_click", "webview_type", "webview_query", "webview_console", "webview_eval", "webview_screenshot", "webview_wait", "webview_render", "webview_update"}
byName := make(map[string]ToolRecord) byName := make(map[string]ToolRecord)
for _, tr := range svc.Tools() { for _, tr := range svc.Tools() {
@ -157,6 +163,18 @@ func TestToolRegistry_Good_GroupAssignment(t *testing.T) {
t.Errorf("tool %s: expected group 'webview', got %q", name, tr.Group) t.Errorf("tool %s: expected group 'webview', got %q", name, tr.Group)
} }
} }
wsClientTools := []string{"ws_connect", "ws_send", "ws_close"}
for _, name := range wsClientTools {
tr, ok := byName[name]
if !ok {
t.Errorf("tool %s not found in registry", name)
continue
}
if tr.Group != "ws" {
t.Errorf("tool %s: expected group 'ws', got %q", name, tr.Group)
}
}
} }
func TestToolRegistry_Good_ToolRecordFields(t *testing.T) { func TestToolRegistry_Good_ToolRecordFields(t *testing.T) {

View file

@ -8,8 +8,8 @@ import (
"time" "time"
core "dappco.re/go/core" core "dappco.re/go/core"
"forge.lthn.ai/core/go-ai/ai" "dappco.re/go/core/ai/ai"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )

View file

@ -6,8 +6,8 @@ import (
"context" "context"
"time" "time"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
"forge.lthn.ai/core/go-process" "dappco.re/go/core/process"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -29,6 +29,32 @@ type ProcessStartInput struct {
Env []string `json:"env,omitempty"` // e.g. ["CGO_ENABLED=0"] Env []string `json:"env,omitempty"` // e.g. ["CGO_ENABLED=0"]
} }
// ProcessRunInput contains parameters for running a command to completion
// and returning its captured output.
//
// input := ProcessRunInput{
// Command: "go",
// Args: []string{"test", "./..."},
// Dir: "/home/user/project",
// Env: []string{"CGO_ENABLED=0"},
// }
type ProcessRunInput struct {
Command string `json:"command"` // e.g. "go"
Args []string `json:"args,omitempty"` // e.g. ["test", "./..."]
Dir string `json:"dir,omitempty"` // e.g. "/home/user/project"
Env []string `json:"env,omitempty"` // e.g. ["CGO_ENABLED=0"]
}
// ProcessRunOutput contains the result of running a process to completion.
//
// // out.ID == "proc-abc123", out.ExitCode == 0, out.Output == "PASS\n..."
type ProcessRunOutput struct {
ID string `json:"id"` // e.g. "proc-abc123"
ExitCode int `json:"exitCode"` // 0 on success
Output string `json:"output"` // combined stdout/stderr
Command string `json:"command"` // e.g. "go"
}
// ProcessStartOutput contains the result of starting a process. // ProcessStartOutput contains the result of starting a process.
// //
// // out.ID == "proc-abc123", out.PID == 54321, out.Command == "go" // // out.ID == "proc-abc123", out.PID == 54321, out.Command == "go"
@ -146,6 +172,11 @@ func (s *Service) registerProcessTools(server *mcp.Server) bool {
Description: "Start a new external process. Returns process ID for tracking.", Description: "Start a new external process. Returns process ID for tracking.",
}, s.processStart) }, s.processStart)
addToolRecorded(s, server, "process", &mcp.Tool{
Name: "process_run",
Description: "Run a command to completion and return the captured output. Blocks until the process exits.",
}, s.processRun)
addToolRecorded(s, server, "process", &mcp.Tool{ addToolRecorded(s, server, "process", &mcp.Tool{
Name: "process_stop", Name: "process_stop",
Description: "Gracefully stop a running process by ID.", Description: "Gracefully stop a running process by ID.",
@ -224,6 +255,63 @@ func (s *Service) processStart(ctx context.Context, req *mcp.CallToolRequest, in
return nil, output, nil return nil, output, nil
} }
// processRun handles the process_run tool call.
// Executes the command to completion and returns the captured output.
func (s *Service) processRun(ctx context.Context, req *mcp.CallToolRequest, input ProcessRunInput) (*mcp.CallToolResult, ProcessRunOutput, error) {
if s.processService == nil {
return nil, ProcessRunOutput{}, log.E("processRun", "process service unavailable", nil)
}
s.logger.Security("MCP tool execution", "tool", "process_run", "command", input.Command, "args", input.Args, "dir", input.Dir, "user", log.Username())
if input.Command == "" {
return nil, ProcessRunOutput{}, log.E("processRun", "command cannot be empty", nil)
}
opts := process.RunOptions{
Command: input.Command,
Args: input.Args,
Dir: s.resolveWorkspacePath(input.Dir),
Env: input.Env,
}
proc, err := s.processService.StartWithOptions(ctx, opts)
if err != nil {
log.Error("mcp: process run start failed", "command", input.Command, "err", err)
return nil, ProcessRunOutput{}, log.E("processRun", "failed to start process", err)
}
info := proc.Info()
s.recordProcessRuntime(proc.ID, processRuntime{
Command: proc.Command,
Args: proc.Args,
Dir: info.Dir,
StartedAt: proc.StartedAt,
})
s.ChannelSend(ctx, ChannelProcessStart, map[string]any{
"id": proc.ID,
"pid": info.PID,
"command": proc.Command,
"args": proc.Args,
"dir": info.Dir,
"startedAt": proc.StartedAt,
})
// Wait for completion (context-aware).
select {
case <-ctx.Done():
return nil, ProcessRunOutput{}, log.E("processRun", "cancelled", ctx.Err())
case <-proc.Done():
}
return nil, ProcessRunOutput{
ID: proc.ID,
ExitCode: proc.ExitCode,
Output: proc.Output(),
Command: proc.Command,
}, nil
}
// processStop handles the process_stop tool call. // processStop handles the process_stop tool call.
func (s *Service) processStop(ctx context.Context, req *mcp.CallToolRequest, input ProcessStopInput) (*mcp.CallToolResult, ProcessStopOutput, error) { func (s *Service) processStop(ctx context.Context, req *mcp.CallToolRequest, input ProcessStopInput) (*mcp.CallToolResult, ProcessStopOutput, error) {
if s.processService == nil { if s.processService == nil {

View file

@ -9,7 +9,7 @@ import (
"time" "time"
"dappco.re/go/core" "dappco.re/go/core"
"forge.lthn.ai/core/go-process" "dappco.re/go/core/process"
) )
// newTestProcessService creates a real process.Service backed by a core.Core for CI tests. // newTestProcessService creates a real process.Service backed by a core.Core for CI tests.

View file

@ -301,3 +301,57 @@ func TestRegisterProcessTools_Bad_NilService(t *testing.T) {
t.Error("Expected registerProcessTools to return false when processService is nil") t.Error("Expected registerProcessTools to return false when processService is nil")
} }
} }
// TestToolsProcess_ProcessRunInput_Good exercises the process_run input DTO shape.
func TestToolsProcess_ProcessRunInput_Good(t *testing.T) {
input := ProcessRunInput{
Command: "echo",
Args: []string{"hello"},
Dir: "/tmp",
Env: []string{"FOO=bar"},
}
if input.Command != "echo" {
t.Errorf("expected command 'echo', got %q", input.Command)
}
if len(input.Args) != 1 || input.Args[0] != "hello" {
t.Errorf("expected args [hello], got %v", input.Args)
}
if input.Dir != "/tmp" {
t.Errorf("expected dir '/tmp', got %q", input.Dir)
}
if len(input.Env) != 1 {
t.Errorf("expected 1 env, got %d", len(input.Env))
}
}
// TestToolsProcess_ProcessRunOutput_Good exercises the process_run output DTO shape.
func TestToolsProcess_ProcessRunOutput_Good(t *testing.T) {
output := ProcessRunOutput{
ID: "proc-1",
ExitCode: 0,
Output: "hello\n",
Command: "echo",
}
if output.ID != "proc-1" {
t.Errorf("expected id 'proc-1', got %q", output.ID)
}
if output.ExitCode != 0 {
t.Errorf("expected exit code 0, got %d", output.ExitCode)
}
if output.Output != "hello\n" {
t.Errorf("expected output 'hello\\n', got %q", output.Output)
}
}
// TestToolsProcess_ProcessRun_Bad rejects calls without a process service.
func TestToolsProcess_ProcessRun_Bad(t *testing.T) {
svc, err := New(Options{})
if err != nil {
t.Fatal(err)
}
_, _, err = svc.processRun(t.Context(), nil, ProcessRunInput{Command: "echo", Args: []string{"hi"}})
if err == nil {
t.Fatal("expected error when process service is unavailable")
}
}

View file

@ -6,8 +6,8 @@ import (
"context" "context"
core "dappco.re/go/core" core "dappco.re/go/core"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
"forge.lthn.ai/core/go-rag" "dappco.re/go/core/rag"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -83,6 +83,30 @@ type RAGCollectionsInput struct {
ShowStats bool `json:"show_stats,omitempty"` // true to include point counts and status ShowStats bool `json:"show_stats,omitempty"` // true to include point counts and status
} }
// RAGRetrieveInput contains parameters for retrieving chunks from a specific
// document source (rather than running a semantic query).
//
// input := RAGRetrieveInput{
// Source: "docs/services.md",
// Collection: "core-docs",
// Limit: 20,
// }
type RAGRetrieveInput struct {
Source string `json:"source"` // e.g. "docs/services.md"
Collection string `json:"collection,omitempty"` // e.g. "core-docs" (default: "hostuk-docs")
Limit int `json:"limit,omitempty"` // e.g. 20 (default: 50)
}
// RAGRetrieveOutput contains document chunks for a specific source.
//
// // len(out.Chunks) == 12, out.Source == "docs/services.md"
type RAGRetrieveOutput struct {
Source string `json:"source"` // e.g. "docs/services.md"
Collection string `json:"collection"` // collection searched
Chunks []RAGQueryResult `json:"chunks"` // chunks for the source, ordered by chunkIndex
Count int `json:"count"` // number of chunks returned
}
// CollectionInfo contains information about a Qdrant collection. // CollectionInfo contains information about a Qdrant collection.
// //
// // ci.Name == "core-docs", ci.PointsCount == 1500, ci.Status == "green" // // ci.Name == "core-docs", ci.PointsCount == 1500, ci.Status == "green"
@ -106,11 +130,28 @@ func (s *Service) registerRAGTools(server *mcp.Server) {
Description: "Query the RAG vector database for relevant documentation. Returns semantically similar content based on the query.", Description: "Query the RAG vector database for relevant documentation. Returns semantically similar content based on the query.",
}, s.ragQuery) }, s.ragQuery)
// rag_search is the spec-aligned alias for rag_query.
addToolRecorded(s, server, "rag", &mcp.Tool{
Name: "rag_search",
Description: "Semantic search across documents in the RAG vector database. Returns chunks ranked by similarity.",
}, s.ragQuery)
addToolRecorded(s, server, "rag", &mcp.Tool{ addToolRecorded(s, server, "rag", &mcp.Tool{
Name: "rag_ingest", Name: "rag_ingest",
Description: "Ingest documents into the RAG vector database. Supports both single files and directories.", Description: "Ingest documents into the RAG vector database. Supports both single files and directories.",
}, s.ragIngest) }, s.ragIngest)
// rag_index is the spec-aligned alias for rag_ingest.
addToolRecorded(s, server, "rag", &mcp.Tool{
Name: "rag_index",
Description: "Index a document or directory into the RAG vector database.",
}, s.ragIngest)
addToolRecorded(s, server, "rag", &mcp.Tool{
Name: "rag_retrieve",
Description: "Retrieve chunks for a specific document source from the RAG vector database.",
}, s.ragRetrieve)
addToolRecorded(s, server, "rag", &mcp.Tool{ addToolRecorded(s, server, "rag", &mcp.Tool{
Name: "rag_collections", Name: "rag_collections",
Description: "List all available collections in the RAG vector database.", Description: "List all available collections in the RAG vector database.",
@ -216,6 +257,86 @@ func (s *Service) ragIngest(ctx context.Context, req *mcp.CallToolRequest, input
}, nil }, nil
} }
// ragRetrieve handles the rag_retrieve tool call.
// Returns chunks for a specific source path by querying the collection with
// the source path as the query text and then filtering results down to the
// matching source. This preserves the transport abstraction that the rest of
// the RAG tools use while producing the document-scoped view callers expect.
func (s *Service) ragRetrieve(ctx context.Context, req *mcp.CallToolRequest, input RAGRetrieveInput) (*mcp.CallToolResult, RAGRetrieveOutput, error) {
collection := input.Collection
if collection == "" {
collection = DefaultRAGCollection
}
limit := input.Limit
if limit <= 0 {
limit = 50
}
s.logger.Info("MCP tool execution", "tool", "rag_retrieve", "source", input.Source, "collection", collection, "limit", limit, "user", log.Username())
if input.Source == "" {
return nil, RAGRetrieveOutput{}, log.E("ragRetrieve", "source cannot be empty", nil)
}
// Use the source path as the query text — semantically related chunks
// will rank highly, and we then keep only chunks whose Source matches.
// Over-fetch by an order of magnitude so document-level limits are met
// even when the source appears beyond the top-K of the raw query.
overfetch := limit * 10
if overfetch < 100 {
overfetch = 100
}
results, err := rag.QueryDocs(ctx, input.Source, collection, overfetch)
if err != nil {
log.Error("mcp: rag retrieve query failed", "source", input.Source, "collection", collection, "err", err)
return nil, RAGRetrieveOutput{}, log.E("ragRetrieve", "failed to retrieve chunks", err)
}
chunks := make([]RAGQueryResult, 0, limit)
for _, r := range results {
if r.Source != input.Source {
continue
}
chunks = append(chunks, RAGQueryResult{
Content: r.Text,
Source: r.Source,
Section: r.Section,
Category: r.Category,
ChunkIndex: r.ChunkIndex,
Score: r.Score,
})
if len(chunks) >= limit {
break
}
}
sortChunksByIndex(chunks)
return nil, RAGRetrieveOutput{
Source: input.Source,
Collection: collection,
Chunks: chunks,
Count: len(chunks),
}, nil
}
// sortChunksByIndex sorts chunks in ascending order of chunk index.
// Stable ordering keeps ties by their original position.
func sortChunksByIndex(chunks []RAGQueryResult) {
if len(chunks) <= 1 {
return
}
// Insertion sort keeps the code dependency-free and is fast enough
// for the small result sets rag_retrieve is designed for.
for i := 1; i < len(chunks); i++ {
j := i
for j > 0 && chunks[j-1].ChunkIndex > chunks[j].ChunkIndex {
chunks[j-1], chunks[j] = chunks[j], chunks[j-1]
j--
}
}
}
// ragCollections handles the rag_collections tool call. // ragCollections handles the rag_collections tool call.
func (s *Service) ragCollections(ctx context.Context, req *mcp.CallToolRequest, input RAGCollectionsInput) (*mcp.CallToolResult, RAGCollectionsOutput, error) { func (s *Service) ragCollections(ctx context.Context, req *mcp.CallToolRequest, input RAGCollectionsInput) (*mcp.CallToolResult, RAGCollectionsOutput, error) {
s.logger.Info("MCP tool execution", "tool", "rag_collections", "show_stats", input.ShowStats, "user", log.Username()) s.logger.Info("MCP tool execution", "tool", "rag_collections", "show_stats", input.ShowStats, "user", log.Username())

View file

@ -171,3 +171,66 @@ func TestRAGCollectionsInput_ShowStats(t *testing.T) {
t.Error("Expected ShowStats to be true") t.Error("Expected ShowStats to be true")
} }
} }
// TestToolsRag_RAGRetrieveInput_Good exercises the rag_retrieve DTO defaults.
func TestToolsRag_RAGRetrieveInput_Good(t *testing.T) {
input := RAGRetrieveInput{
Source: "docs/index.md",
Collection: "core-docs",
Limit: 20,
}
if input.Source != "docs/index.md" {
t.Errorf("expected source docs/index.md, got %q", input.Source)
}
if input.Limit != 20 {
t.Errorf("expected limit 20, got %d", input.Limit)
}
}
// TestToolsRag_RAGRetrieveOutput_Good exercises the rag_retrieve output shape.
func TestToolsRag_RAGRetrieveOutput_Good(t *testing.T) {
output := RAGRetrieveOutput{
Source: "docs/index.md",
Collection: "core-docs",
Chunks: []RAGQueryResult{
{Content: "first", ChunkIndex: 0},
{Content: "second", ChunkIndex: 1},
},
Count: 2,
}
if output.Count != 2 {
t.Fatalf("expected count 2, got %d", output.Count)
}
if output.Chunks[1].ChunkIndex != 1 {
t.Fatalf("expected chunk 1, got %d", output.Chunks[1].ChunkIndex)
}
}
// TestToolsRag_SortChunksByIndex_Good verifies sort orders by chunk index ascending.
func TestToolsRag_SortChunksByIndex_Good(t *testing.T) {
chunks := []RAGQueryResult{
{ChunkIndex: 3},
{ChunkIndex: 1},
{ChunkIndex: 2},
}
sortChunksByIndex(chunks)
for i, want := range []int{1, 2, 3} {
if chunks[i].ChunkIndex != want {
t.Fatalf("index %d: expected chunk %d, got %d", i, want, chunks[i].ChunkIndex)
}
}
}
// TestToolsRag_RagRetrieve_Bad rejects empty source paths.
func TestToolsRag_RagRetrieve_Bad(t *testing.T) {
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, _, err = svc.ragRetrieve(t.Context(), nil, RAGRetrieveInput{})
if err == nil {
t.Fatal("expected error for empty source")
}
}

View file

@ -9,13 +9,12 @@ import (
"image" "image"
"image/jpeg" "image/jpeg"
_ "image/png" _ "image/png"
"strings"
"sync" "sync"
"time" "time"
core "dappco.re/go/core" core "dappco.re/go/core"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
"forge.lthn.ai/core/go-webview" "dappco.re/go/core/webview"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -271,6 +270,18 @@ func (s *Service) registerWebviewTools(server *mcp.Server) {
Name: "webview_wait", Name: "webview_wait",
Description: "Wait for an element to appear by CSS selector.", Description: "Wait for an element to appear by CSS selector.",
}, s.webviewWait) }, s.webviewWait)
// Embedded UI rendering — for pushing HTML/state to connected clients
// without requiring a Chrome DevTools connection.
addToolRecorded(s, server, "webview", &mcp.Tool{
Name: "webview_render",
Description: "Render HTML in an embedded webview by ID. Broadcasts to connected clients via the webview.render channel.",
}, s.webviewRender)
addToolRecorded(s, server, "webview", &mcp.Tool{
Name: "webview_update",
Description: "Update the HTML, title, or state of an embedded webview by ID. Broadcasts to connected clients via the webview.update channel.",
}, s.webviewUpdate)
} }
// webviewConnect handles the webview_connect tool call. // webviewConnect handles the webview_connect tool call.
@ -554,7 +565,7 @@ func (s *Service) webviewScreenshot(ctx context.Context, req *mcp.CallToolReques
if format == "" { if format == "" {
format = "png" format = "png"
} }
format = strings.ToLower(format) format = core.Lower(format)
data, err := webviewInstance.Screenshot() data, err := webviewInstance.Screenshot()
if err != nil { if err != nil {
@ -649,7 +660,7 @@ func waitForSelector(ctx context.Context, timeout time.Duration, selector string
if err == nil { if err == nil {
return nil return nil
} }
if !strings.Contains(err.Error(), "element not found") { if !core.Contains(err.Error(), "element not found") {
return err return err
} }

View file

@ -0,0 +1,233 @@
// SPDX-License-Identifier: EUPL-1.2
package mcp
import (
"context"
"sync"
"time"
core "dappco.re/go/core"
"dappco.re/go/core/log"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// WebviewRenderInput contains parameters for rendering an embedded
// HTML view. The named view is stored and broadcast so connected clients
// (Claude Code sessions, CoreGUI windows, HTTP/SSE subscribers) can
// display the content.
//
// input := WebviewRenderInput{
// ViewID: "dashboard",
// HTML: "<div id='app'>Loading...</div>",
// Title: "Agent Dashboard",
// Width: 1024,
// Height: 768,
// State: map[string]any{"theme": "dark"},
// }
type WebviewRenderInput struct {
ViewID string `json:"view_id"` // e.g. "dashboard"
HTML string `json:"html"` // rendered markup
Title string `json:"title,omitempty"` // e.g. "Agent Dashboard"
Width int `json:"width,omitempty"` // preferred width in pixels
Height int `json:"height,omitempty"` // preferred height in pixels
State map[string]any `json:"state,omitempty"` // initial view state
}
// WebviewRenderOutput reports the result of rendering an embedded view.
//
// // out.Success == true, out.ViewID == "dashboard"
type WebviewRenderOutput struct {
Success bool `json:"success"` // true when the view was stored and broadcast
ViewID string `json:"view_id"` // echoed view identifier
UpdatedAt time.Time `json:"updatedAt"` // when the view was rendered
}
// WebviewUpdateInput contains parameters for updating the state of an
// existing embedded view. Callers may provide HTML to replace the markup,
// patch fields in the view state, or do both.
//
// input := WebviewUpdateInput{
// ViewID: "dashboard",
// HTML: "<div id='app'>Ready</div>",
// State: map[string]any{"count": 42},
// Merge: true,
// }
type WebviewUpdateInput struct {
ViewID string `json:"view_id"` // e.g. "dashboard"
HTML string `json:"html,omitempty"` // replacement markup (optional)
Title string `json:"title,omitempty"` // e.g. "Agent Dashboard"
State map[string]any `json:"state,omitempty"` // partial state update
Merge bool `json:"merge,omitempty"` // merge state (default) or replace when false
}
// WebviewUpdateOutput reports the result of updating an embedded view.
//
// // out.Success == true, out.ViewID == "dashboard"
type WebviewUpdateOutput struct {
Success bool `json:"success"` // true when the view was updated and broadcast
ViewID string `json:"view_id"` // echoed view identifier
UpdatedAt time.Time `json:"updatedAt"` // when the view was last updated
}
// embeddedView captures the live state of a rendered UI view. Instances
// are kept per ViewID inside embeddedViewRegistry.
type embeddedView struct {
ViewID string
Title string
HTML string
Width int
Height int
State map[string]any
UpdatedAt time.Time
}
// embeddedViewRegistry stores the most recent render/update state for each
// view so new subscribers can pick up the current UI on connection.
// Operations are guarded by embeddedViewMu.
var (
embeddedViewMu sync.RWMutex
embeddedViewRegistry = map[string]*embeddedView{}
)
// ChannelWebviewRender is the channel used to broadcast webview_render events.
const ChannelWebviewRender = "webview.render"
// ChannelWebviewUpdate is the channel used to broadcast webview_update events.
const ChannelWebviewUpdate = "webview.update"
// webviewRender handles the webview_render tool call.
func (s *Service) webviewRender(ctx context.Context, req *mcp.CallToolRequest, input WebviewRenderInput) (*mcp.CallToolResult, WebviewRenderOutput, error) {
s.logger.Info("MCP tool execution", "tool", "webview_render", "view", input.ViewID, "user", log.Username())
if core.Trim(input.ViewID) == "" {
return nil, WebviewRenderOutput{}, log.E("webviewRender", "view_id is required", nil)
}
now := time.Now()
view := &embeddedView{
ViewID: input.ViewID,
Title: input.Title,
HTML: input.HTML,
Width: input.Width,
Height: input.Height,
State: cloneStateMap(input.State),
UpdatedAt: now,
}
embeddedViewMu.Lock()
embeddedViewRegistry[input.ViewID] = view
embeddedViewMu.Unlock()
s.ChannelSend(ctx, ChannelWebviewRender, map[string]any{
"view_id": view.ViewID,
"title": view.Title,
"html": view.HTML,
"width": view.Width,
"height": view.Height,
"state": cloneStateMap(view.State),
"updatedAt": view.UpdatedAt,
})
return nil, WebviewRenderOutput{
Success: true,
ViewID: view.ViewID,
UpdatedAt: view.UpdatedAt,
}, nil
}
// webviewUpdate handles the webview_update tool call.
func (s *Service) webviewUpdate(ctx context.Context, req *mcp.CallToolRequest, input WebviewUpdateInput) (*mcp.CallToolResult, WebviewUpdateOutput, error) {
s.logger.Info("MCP tool execution", "tool", "webview_update", "view", input.ViewID, "user", log.Username())
if core.Trim(input.ViewID) == "" {
return nil, WebviewUpdateOutput{}, log.E("webviewUpdate", "view_id is required", nil)
}
now := time.Now()
embeddedViewMu.Lock()
view, ok := embeddedViewRegistry[input.ViewID]
if !ok {
// Updating a view that was never rendered creates one lazily so
// clients that reconnect mid-session get a consistent snapshot.
view = &embeddedView{ViewID: input.ViewID, State: map[string]any{}}
embeddedViewRegistry[input.ViewID] = view
}
if input.HTML != "" {
view.HTML = input.HTML
}
if input.Title != "" {
view.Title = input.Title
}
if input.State != nil {
merge := input.Merge || len(view.State) == 0
if merge {
if view.State == nil {
view.State = map[string]any{}
}
for k, v := range input.State {
view.State[k] = v
}
} else {
view.State = cloneStateMap(input.State)
}
}
view.UpdatedAt = now
snapshot := *view
snapshot.State = cloneStateMap(view.State)
embeddedViewMu.Unlock()
s.ChannelSend(ctx, ChannelWebviewUpdate, map[string]any{
"view_id": snapshot.ViewID,
"title": snapshot.Title,
"html": snapshot.HTML,
"width": snapshot.Width,
"height": snapshot.Height,
"state": snapshot.State,
"updatedAt": snapshot.UpdatedAt,
})
return nil, WebviewUpdateOutput{
Success: true,
ViewID: snapshot.ViewID,
UpdatedAt: snapshot.UpdatedAt,
}, nil
}
// cloneStateMap returns a shallow copy of a state map.
//
// cloned := cloneStateMap(map[string]any{"a": 1}) // cloned["a"] == 1
func cloneStateMap(in map[string]any) map[string]any {
if in == nil {
return nil
}
out := make(map[string]any, len(in))
for k, v := range in {
out[k] = v
}
return out
}
// lookupEmbeddedView returns the current snapshot of an embedded view, if any.
//
// view, ok := lookupEmbeddedView("dashboard")
func lookupEmbeddedView(id string) (*embeddedView, bool) {
embeddedViewMu.RLock()
defer embeddedViewMu.RUnlock()
view, ok := embeddedViewRegistry[id]
if !ok {
return nil, false
}
snapshot := *view
snapshot.State = cloneStateMap(view.State)
return &snapshot, true
}
// resetEmbeddedViews clears the registry. Intended for tests.
func resetEmbeddedViews() {
embeddedViewMu.Lock()
defer embeddedViewMu.Unlock()
embeddedViewRegistry = map[string]*embeddedView{}
}

View file

@ -0,0 +1,137 @@
// SPDX-License-Identifier: EUPL-1.2
package mcp
import (
"context"
"testing"
)
// TestToolsWebviewEmbed_WebviewRender_Good registers a view and verifies the
// registry keeps the rendered HTML and state.
func TestToolsWebviewEmbed_WebviewRender_Good(t *testing.T) {
t.Cleanup(resetEmbeddedViews)
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, out, err := svc.webviewRender(context.Background(), nil, WebviewRenderInput{
ViewID: "dashboard",
HTML: "<p>hello</p>",
Title: "Demo",
State: map[string]any{"count": 1},
})
if err != nil {
t.Fatalf("webviewRender returned error: %v", err)
}
if !out.Success {
t.Fatal("expected Success=true")
}
if out.ViewID != "dashboard" {
t.Fatalf("expected view id 'dashboard', got %q", out.ViewID)
}
if out.UpdatedAt.IsZero() {
t.Fatal("expected non-zero UpdatedAt")
}
view, ok := lookupEmbeddedView("dashboard")
if !ok {
t.Fatal("expected view to be stored in registry")
}
if view.HTML != "<p>hello</p>" {
t.Fatalf("expected HTML '<p>hello</p>', got %q", view.HTML)
}
if view.State["count"] != 1 {
t.Fatalf("expected state.count=1, got %v", view.State["count"])
}
}
// TestToolsWebviewEmbed_WebviewRender_Bad ensures empty view IDs are rejected.
func TestToolsWebviewEmbed_WebviewRender_Bad(t *testing.T) {
t.Cleanup(resetEmbeddedViews)
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, _, err = svc.webviewRender(context.Background(), nil, WebviewRenderInput{})
if err == nil {
t.Fatal("expected error for empty view_id")
}
}
// TestToolsWebviewEmbed_WebviewUpdate_Good merges a state patch into the
// previously rendered view.
func TestToolsWebviewEmbed_WebviewUpdate_Good(t *testing.T) {
t.Cleanup(resetEmbeddedViews)
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, _, err = svc.webviewRender(context.Background(), nil, WebviewRenderInput{
ViewID: "dashboard",
HTML: "<p>hello</p>",
State: map[string]any{"count": 1},
})
if err != nil {
t.Fatalf("seed render failed: %v", err)
}
_, out, err := svc.webviewUpdate(context.Background(), nil, WebviewUpdateInput{
ViewID: "dashboard",
State: map[string]any{"theme": "dark"},
Merge: true,
})
if err != nil {
t.Fatalf("webviewUpdate returned error: %v", err)
}
if !out.Success {
t.Fatal("expected Success=true")
}
view, ok := lookupEmbeddedView("dashboard")
if !ok {
t.Fatal("expected view to exist after update")
}
if view.State["count"] != 1 {
t.Fatalf("expected count to persist after merge, got %v", view.State["count"])
}
if view.State["theme"] != "dark" {
t.Fatalf("expected theme 'dark' after merge, got %v", view.State["theme"])
}
}
// TestToolsWebviewEmbed_WebviewUpdate_Ugly updates a view that was never
// rendered and verifies a fresh registry entry is created.
func TestToolsWebviewEmbed_WebviewUpdate_Ugly(t *testing.T) {
t.Cleanup(resetEmbeddedViews)
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, out, err := svc.webviewUpdate(context.Background(), nil, WebviewUpdateInput{
ViewID: "ghost",
HTML: "<p>new</p>",
})
if err != nil {
t.Fatalf("webviewUpdate returned error: %v", err)
}
if !out.Success {
t.Fatal("expected Success=true for lazy-create update")
}
view, ok := lookupEmbeddedView("ghost")
if !ok {
t.Fatal("expected ghost view to be created lazily")
}
if view.HTML != "<p>new</p>" {
t.Fatalf("expected HTML '<p>new</p>', got %q", view.HTML)
}
}

View file

@ -11,7 +11,7 @@ import (
"testing" "testing"
"time" "time"
"forge.lthn.ai/core/go-webview" "dappco.re/go/core/webview"
) )
// skipIfShort skips webview tests in short mode (go test -short). // skipIfShort skips webview tests in short mode (go test -short).

View file

@ -8,8 +8,8 @@ import (
"net/http" "net/http"
core "dappco.re/go/core" core "dappco.re/go/core"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )

264
pkg/mcp/tools_ws_client.go Normal file
View file

@ -0,0 +1,264 @@
// SPDX-License-Identifier: EUPL-1.2
package mcp
import (
"context"
"crypto/rand"
"encoding/hex"
"net/http"
"sync"
"time"
core "dappco.re/go/core"
"dappco.re/go/core/log"
"github.com/gorilla/websocket"
"github.com/modelcontextprotocol/go-sdk/mcp"
)
// WSConnectInput contains parameters for opening an outbound WebSocket
// connection from the MCP server. Each connection is given a stable ID that
// subsequent ws_send and ws_close calls use to address it.
//
// input := WSConnectInput{URL: "wss://example.com/ws", Timeout: 10}
type WSConnectInput struct {
URL string `json:"url"` // e.g. "wss://example.com/ws"
Headers map[string]string `json:"headers,omitempty"` // custom request headers
Timeout int `json:"timeout,omitempty"` // handshake timeout in seconds (default: 30)
}
// WSConnectOutput contains the result of opening a WebSocket connection.
//
// // out.Success == true, out.ID == "ws-0af3…"
type WSConnectOutput struct {
Success bool `json:"success"` // true when the handshake completed
ID string `json:"id"` // e.g. "ws-0af3…"
URL string `json:"url"` // the URL that was dialled
}
// WSSendInput contains parameters for sending a message on an open
// WebSocket connection.
//
// input := WSSendInput{ID: "ws-0af3…", Message: "ping"}
type WSSendInput struct {
ID string `json:"id"` // e.g. "ws-0af3…"
Message string `json:"message"` // payload to send
Binary bool `json:"binary,omitempty"` // true to send a binary frame (payload is base64 text)
}
// WSSendOutput contains the result of sending a message.
//
// // out.Success == true, out.ID == "ws-0af3…"
type WSSendOutput struct {
Success bool `json:"success"` // true when the message was written
ID string `json:"id"` // e.g. "ws-0af3…"
Bytes int `json:"bytes"` // number of bytes written
}
// WSCloseInput contains parameters for closing a WebSocket connection.
//
// input := WSCloseInput{ID: "ws-0af3…", Reason: "done"}
type WSCloseInput struct {
ID string `json:"id"` // e.g. "ws-0af3…"
Code int `json:"code,omitempty"` // close code (default: 1000 - normal closure)
Reason string `json:"reason,omitempty"` // human-readable reason
}
// WSCloseOutput contains the result of closing a WebSocket connection.
//
// // out.Success == true, out.ID == "ws-0af3…"
type WSCloseOutput struct {
Success bool `json:"success"` // true when the connection was closed
ID string `json:"id"` // e.g. "ws-0af3…"
Message string `json:"message,omitempty"` // e.g. "connection closed"
}
// wsClientConn tracks an outbound WebSocket connection tied to a stable ID.
type wsClientConn struct {
ID string
URL string
conn *websocket.Conn
writeMu sync.Mutex
CreatedAt time.Time
}
// wsClientRegistry holds all live outbound WebSocket connections keyed by ID.
// Access is guarded by wsClientMu.
var (
wsClientMu sync.Mutex
wsClientRegistry = map[string]*wsClientConn{}
)
// registerWSClientTools registers the outbound WebSocket client tools.
func (s *Service) registerWSClientTools(server *mcp.Server) {
addToolRecorded(s, server, "ws", &mcp.Tool{
Name: "ws_connect",
Description: "Open an outbound WebSocket connection. Returns a connection ID for subsequent ws_send and ws_close calls.",
}, s.wsConnect)
addToolRecorded(s, server, "ws", &mcp.Tool{
Name: "ws_send",
Description: "Send a text or binary message on an open WebSocket connection identified by ID.",
}, s.wsSend)
addToolRecorded(s, server, "ws", &mcp.Tool{
Name: "ws_close",
Description: "Close an open WebSocket connection identified by ID.",
}, s.wsClose)
}
// wsConnect handles the ws_connect tool call.
func (s *Service) wsConnect(ctx context.Context, req *mcp.CallToolRequest, input WSConnectInput) (*mcp.CallToolResult, WSConnectOutput, error) {
s.logger.Security("MCP tool execution", "tool", "ws_connect", "url", input.URL, "user", log.Username())
if core.Trim(input.URL) == "" {
return nil, WSConnectOutput{}, log.E("wsConnect", "url is required", nil)
}
timeout := time.Duration(input.Timeout) * time.Second
if timeout <= 0 {
timeout = 30 * time.Second
}
dialer := websocket.Dialer{
HandshakeTimeout: timeout,
}
headers := http.Header{}
for k, v := range input.Headers {
headers.Set(k, v)
}
dialCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
conn, _, err := dialer.DialContext(dialCtx, input.URL, headers)
if err != nil {
log.Error("mcp: ws connect failed", "url", input.URL, "err", err)
return nil, WSConnectOutput{}, log.E("wsConnect", "failed to connect", err)
}
id := newWSClientID()
client := &wsClientConn{
ID: id,
URL: input.URL,
conn: conn,
CreatedAt: time.Now(),
}
wsClientMu.Lock()
wsClientRegistry[id] = client
wsClientMu.Unlock()
return nil, WSConnectOutput{
Success: true,
ID: id,
URL: input.URL,
}, nil
}
// wsSend handles the ws_send tool call.
func (s *Service) wsSend(ctx context.Context, req *mcp.CallToolRequest, input WSSendInput) (*mcp.CallToolResult, WSSendOutput, error) {
s.logger.Info("MCP tool execution", "tool", "ws_send", "id", input.ID, "binary", input.Binary, "user", log.Username())
if core.Trim(input.ID) == "" {
return nil, WSSendOutput{}, log.E("wsSend", "id is required", nil)
}
client, ok := getWSClient(input.ID)
if !ok {
return nil, WSSendOutput{}, log.E("wsSend", "connection not found", nil)
}
messageType := websocket.TextMessage
if input.Binary {
messageType = websocket.BinaryMessage
}
client.writeMu.Lock()
err := client.conn.WriteMessage(messageType, []byte(input.Message))
client.writeMu.Unlock()
if err != nil {
log.Error("mcp: ws send failed", "id", input.ID, "err", err)
return nil, WSSendOutput{}, log.E("wsSend", "failed to send message", err)
}
return nil, WSSendOutput{
Success: true,
ID: input.ID,
Bytes: len(input.Message),
}, nil
}
// wsClose handles the ws_close tool call.
func (s *Service) wsClose(ctx context.Context, req *mcp.CallToolRequest, input WSCloseInput) (*mcp.CallToolResult, WSCloseOutput, error) {
s.logger.Info("MCP tool execution", "tool", "ws_close", "id", input.ID, "user", log.Username())
if core.Trim(input.ID) == "" {
return nil, WSCloseOutput{}, log.E("wsClose", "id is required", nil)
}
wsClientMu.Lock()
client, ok := wsClientRegistry[input.ID]
if ok {
delete(wsClientRegistry, input.ID)
}
wsClientMu.Unlock()
if !ok {
return nil, WSCloseOutput{}, log.E("wsClose", "connection not found", nil)
}
code := input.Code
if code == 0 {
code = websocket.CloseNormalClosure
}
reason := input.Reason
if reason == "" {
reason = "closed"
}
client.writeMu.Lock()
_ = client.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(code, reason),
time.Now().Add(5*time.Second),
)
client.writeMu.Unlock()
_ = client.conn.Close()
return nil, WSCloseOutput{
Success: true,
ID: input.ID,
Message: "connection closed",
}, nil
}
// newWSClientID returns a fresh identifier for an outbound WebSocket client.
//
// id := newWSClientID() // "ws-0af3…"
func newWSClientID() string {
var buf [8]byte
_, _ = rand.Read(buf[:])
return "ws-" + hex.EncodeToString(buf[:])
}
// getWSClient returns a tracked outbound WebSocket client by ID, if any.
//
// client, ok := getWSClient("ws-0af3…")
func getWSClient(id string) (*wsClientConn, bool) {
wsClientMu.Lock()
defer wsClientMu.Unlock()
client, ok := wsClientRegistry[id]
return client, ok
}
// resetWSClients drops all tracked outbound WebSocket clients. Intended for tests.
func resetWSClients() {
wsClientMu.Lock()
defer wsClientMu.Unlock()
for id, client := range wsClientRegistry {
_ = client.conn.Close()
delete(wsClientRegistry, id)
}
}

View file

@ -0,0 +1,169 @@
// SPDX-License-Identifier: EUPL-1.2
package mcp
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
)
// TestToolsWSClient_WSConnect_Good dials a test WebSocket server and verifies
// the handshake completes and a client ID is assigned.
func TestToolsWSClient_WSConnect_Good(t *testing.T) {
t.Cleanup(resetWSClients)
server := startTestWSServer(t)
defer server.Close()
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, out, err := svc.wsConnect(context.Background(), nil, WSConnectInput{
URL: "ws" + strings.TrimPrefix(server.URL, "http") + "/ws",
Timeout: 5,
})
if err != nil {
t.Fatalf("wsConnect failed: %v", err)
}
if !out.Success {
t.Fatal("expected Success=true")
}
if !strings.HasPrefix(out.ID, "ws-") {
t.Fatalf("expected ID prefix 'ws-', got %q", out.ID)
}
_, _, err = svc.wsClose(context.Background(), nil, WSCloseInput{ID: out.ID})
if err != nil {
t.Fatalf("wsClose failed: %v", err)
}
}
// TestToolsWSClient_WSConnect_Bad rejects empty URLs.
func TestToolsWSClient_WSConnect_Bad(t *testing.T) {
t.Cleanup(resetWSClients)
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, _, err = svc.wsConnect(context.Background(), nil, WSConnectInput{})
if err == nil {
t.Fatal("expected error for empty URL")
}
}
// TestToolsWSClient_WSSendClose_Good sends a message on an open connection
// and then closes it.
func TestToolsWSClient_WSSendClose_Good(t *testing.T) {
t.Cleanup(resetWSClients)
server := startTestWSServer(t)
defer server.Close()
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, conn, err := svc.wsConnect(context.Background(), nil, WSConnectInput{
URL: "ws" + strings.TrimPrefix(server.URL, "http") + "/ws",
Timeout: 5,
})
if err != nil {
t.Fatalf("wsConnect failed: %v", err)
}
_, sendOut, err := svc.wsSend(context.Background(), nil, WSSendInput{
ID: conn.ID,
Message: "ping",
})
if err != nil {
t.Fatalf("wsSend failed: %v", err)
}
if !sendOut.Success {
t.Fatal("expected Success=true for wsSend")
}
if sendOut.Bytes != 4 {
t.Fatalf("expected 4 bytes written, got %d", sendOut.Bytes)
}
_, closeOut, err := svc.wsClose(context.Background(), nil, WSCloseInput{ID: conn.ID})
if err != nil {
t.Fatalf("wsClose failed: %v", err)
}
if !closeOut.Success {
t.Fatal("expected Success=true for wsClose")
}
if _, ok := getWSClient(conn.ID); ok {
t.Fatal("expected connection to be removed after close")
}
}
// TestToolsWSClient_WSSend_Bad rejects unknown connection IDs.
func TestToolsWSClient_WSSend_Bad(t *testing.T) {
t.Cleanup(resetWSClients)
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, _, err = svc.wsSend(context.Background(), nil, WSSendInput{ID: "ws-missing", Message: "x"})
if err == nil {
t.Fatal("expected error for unknown connection ID")
}
}
// TestToolsWSClient_WSClose_Bad rejects closes for unknown connection IDs.
func TestToolsWSClient_WSClose_Bad(t *testing.T) {
t.Cleanup(resetWSClients)
svc, err := New(Options{WorkspaceRoot: t.TempDir()})
if err != nil {
t.Fatal(err)
}
_, _, err = svc.wsClose(context.Background(), nil, WSCloseInput{ID: "ws-missing"})
if err == nil {
t.Fatal("expected error for unknown connection ID")
}
}
// startTestWSServer returns an httptest.Server running a minimal echo WebSocket
// handler used by the ws_connect/ws_send tests.
func startTestWSServer(t *testing.T) *httptest.Server {
t.Helper()
upgrader := websocket.Upgrader{
CheckOrigin: func(*http.Request) bool { return true },
}
mux := http.NewServeMux()
mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
for {
_, msg, err := conn.ReadMessage()
if err != nil {
return
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
}
})
return httptest.NewServer(mux)
}

View file

@ -3,7 +3,7 @@ package mcp
import ( import (
"testing" "testing"
"forge.lthn.ai/core/go-ws" "dappco.re/go/core/ws"
) )
// TestWSToolsRegistered_Good verifies that WebSocket tools are registered when hub is available. // TestWSToolsRegistered_Good verifies that WebSocket tools are registered when hub is available.

View file

@ -4,14 +4,16 @@ package mcp
import ( import (
"context" "context"
"crypto/subtle" "encoding/json"
"net" "net"
"net/http" "net/http"
"os"
"strings" "strings"
"time" "time"
coreerr "forge.lthn.ai/core/go-log" core "dappco.re/go/core"
coreerr "dappco.re/go/core/log"
api "dappco.re/go/core/api"
"github.com/gin-gonic/gin"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -32,12 +34,18 @@ const DefaultHTTPAddr = "127.0.0.1:9101"
// svc.ServeHTTP(ctx, "0.0.0.0:9101") // svc.ServeHTTP(ctx, "0.0.0.0:9101")
// //
// Endpoint /mcp: GET (SSE stream), POST (JSON-RPC), DELETE (terminate session). // Endpoint /mcp: GET (SSE stream), POST (JSON-RPC), DELETE (terminate session).
//
// Additional endpoints:
// - POST /mcp/auth: exchange API token for JWT
// - /v1/tools/<tool_name>: auto-mounted REST bridge for MCP tools
// - /health: unauthenticated health endpoint
// - /.well-known/mcp-servers.json: MCP portal discovery
func (s *Service) ServeHTTP(ctx context.Context, addr string) error { func (s *Service) ServeHTTP(ctx context.Context, addr string) error {
if addr == "" { if addr == "" {
addr = DefaultHTTPAddr addr = DefaultHTTPAddr
} }
authToken := os.Getenv("MCP_AUTH_TOKEN") authToken := core.Env("MCP_AUTH_TOKEN")
handler := mcp.NewStreamableHTTPHandler( handler := mcp.NewStreamableHTTPHandler(
func(r *http.Request) *mcp.Server { func(r *http.Request) *mcp.Server {
@ -48,13 +56,25 @@ func (s *Service) ServeHTTP(ctx context.Context, addr string) error {
}, },
) )
toolBridge := api.NewToolBridge("/v1/tools")
BridgeToAPI(s, toolBridge)
toolEngine := gin.New()
toolBridge.RegisterRoutes(toolEngine.Group("/v1/tools"))
toolHandler := withAuth(authToken, toolEngine)
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle("/mcp", withAuth(authToken, handler)) mux.Handle("/mcp", withAuth(authToken, handler))
mux.Handle("/v1/tools", toolHandler)
mux.Handle("/v1/tools/", toolHandler)
mux.HandleFunc("/mcp/auth", func(w http.ResponseWriter, r *http.Request) {
serveMCPAuthExchange(w, r, authToken)
})
mux.HandleFunc("/.well-known/mcp-servers.json", handleMCPDiscovery)
// Health check (no auth) // Health check (no auth)
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok"}`)) _ = json.NewEncoder(w).Encode(map[string]any{"status": "ok"})
}) })
listener, err := net.Listen("tcp", addr) listener, err := net.Listen("tcp", addr)
@ -72,7 +92,7 @@ func (s *Service) ServeHTTP(ctx context.Context, addr string) error {
<-ctx.Done() <-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
server.Shutdown(shutdownCtx) _ = server.Shutdown(shutdownCtx)
}() }()
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed { if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
@ -81,31 +101,185 @@ func (s *Service) ServeHTTP(ctx context.Context, addr string) error {
return nil return nil
} }
type mcpAuthExchangeRequest struct {
Token string `json:"token"`
Workspace string `json:"workspace"`
Entitlements []string `json:"entitlements"`
Sub string `json:"sub"`
}
type mcpAuthExchangeResponse struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
ExpiresIn int64 `json:"expires_in"`
ExpiresAt int64 `json:"expires_at"`
}
type mcpDiscoveryResponse struct {
Servers []mcpDiscoveryServer `json:"servers"`
}
type mcpDiscoveryServer struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Connection map[string]any `json:"connection"`
Capabilities []string `json:"capabilities"`
UseWhen []string `json:"use_when"`
RelatedServers []string `json:"related_servers"`
}
// withAuth wraps an http.Handler with Bearer token authentication. // withAuth wraps an http.Handler with Bearer token authentication.
// If token is empty, authentication is disabled for local development. // If token is empty, authentication is disabled for local development.
func withAuth(token string, next http.Handler) http.Handler { func withAuth(token string, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.TrimSpace(token) == "" { if core.Trim(token) == "" {
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
return return
} }
auth := r.Header.Get("Authorization") claims, err := parseAuthClaims(r.Header.Get("Authorization"), token)
if !strings.HasPrefix(auth, "Bearer ") { if err != nil {
http.Error(w, `{"error":"missing Bearer token"}`, http.StatusUnauthorized)
return
}
provided := strings.TrimSpace(strings.TrimPrefix(auth, "Bearer "))
if len(provided) == 0 {
http.Error(w, `{"error":"missing Bearer token"}`, http.StatusUnauthorized)
return
}
if subtle.ConstantTimeCompare([]byte(provided), []byte(token)) != 1 {
http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized) http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized)
return return
} }
if claims != nil {
r = r.WithContext(withAuthClaims(r.Context(), claims))
}
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
}) })
} }
func serveMCPAuthExchange(w http.ResponseWriter, r *http.Request, apiToken string) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
apiToken = core.Trim(apiToken)
if apiToken == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
_ = json.NewEncoder(w).Encode(api.Fail("unauthorized", "authentication is not configured"))
return
}
var req mcpAuthExchangeRequest
if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 10<<20)).Decode(&req); err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(api.Fail("invalid_request", "invalid JSON payload"))
return
}
providedToken := strings.TrimSpace(extractBearerToken(r.Header.Get("Authorization")))
if providedToken == "" {
providedToken = strings.TrimSpace(req.Token)
}
if providedToken == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(api.Fail("invalid_request", "missing token"))
return
}
if _, err := parseAuthClaims("Bearer "+providedToken, apiToken); err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
_ = json.NewEncoder(w).Encode(api.Fail("unauthorized", "invalid API token"))
return
}
cfg := currentAuthConfig(apiToken)
now := time.Now()
claims := authClaims{
Workspace: strings.TrimSpace(req.Workspace),
Entitlements: dedupeEntitlements(req.Entitlements),
Subject: core.Trim(req.Sub),
IssuedAt: now.Unix(),
ExpiresAt: now.Unix() + int64(cfg.ttl.Seconds()),
}
minted, err := mintJWTToken(claims, cfg)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(api.Fail("token_error", "failed to mint token"))
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(mcpAuthExchangeResponse{
AccessToken: minted,
TokenType: "Bearer",
ExpiresIn: int64(cfg.ttl.Seconds()),
ExpiresAt: claims.ExpiresAt,
})
}
func dedupeEntitlements(entitlements []string) []string {
if len(entitlements) == 0 {
return nil
}
seen := make(map[string]struct{}, len(entitlements))
out := make([]string, 0, len(entitlements))
for _, ent := range entitlements {
e := strings.TrimSpace(ent)
if e == "" {
continue
}
if _, ok := seen[e]; ok {
continue
}
seen[e] = struct{}{}
out = append(out, e)
}
return out
}
func handleMCPDiscovery(w http.ResponseWriter, r *http.Request) {
resp := mcpDiscoveryResponse{
Servers: []mcpDiscoveryServer{
{
ID: "core-agent",
Name: "Core Agent",
Description: "Dispatch agents, manage workspaces, search OpenBrain",
Connection: map[string]any{
"type": "stdio",
"command": "core-agent",
"args": []string{"mcp"},
},
Capabilities: []string{"tools", "resources"},
UseWhen: []string{
"Need to dispatch work to Codex/Claude/Gemini",
"Need workspace status",
"Need semantic search",
},
RelatedServers: []string{"core-mcp"},
},
{
ID: "core-mcp",
Name: "Core MCP",
Description: "File ops, process and build tools, RAG search, webview, dashboards — the agent-facing MCP framework.",
Connection: map[string]any{
"type": "stdio",
"command": "core-mcp",
},
Capabilities: []string{"tools", "resources", "logging"},
UseWhen: []string{
"Need to read/write files inside a workspace",
"Need to start or monitor processes",
"Need to run RAG queries or index documents",
"Need to render or update an embedded dashboard view",
},
RelatedServers: []string{"core-agent"},
},
},
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(api.Fail("server_error", "failed to encode discovery payload"))
}
}

View file

@ -6,7 +6,7 @@ import (
"context" "context"
"os" "os"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )

View file

@ -5,12 +5,12 @@ package mcp
import ( import (
"bufio" "bufio"
"context" "context"
"fmt"
goio "io" goio "io"
"net" "net"
"os" "os"
"sync" "sync"
core "dappco.re/go/core"
"github.com/modelcontextprotocol/go-sdk/jsonrpc" "github.com/modelcontextprotocol/go-sdk/jsonrpc"
"github.com/modelcontextprotocol/go-sdk/mcp" "github.com/modelcontextprotocol/go-sdk/mcp"
) )
@ -31,7 +31,7 @@ var diagWriter goio.Writer = os.Stderr
func diagPrintf(format string, args ...any) { func diagPrintf(format string, args ...any) {
diagMu.Lock() diagMu.Lock()
defer diagMu.Unlock() defer diagMu.Unlock()
fmt.Fprintf(diagWriter, format, args...) core.Print(diagWriter, format, args...)
} }
// setDiagWriter swaps the diagnostic writer and returns the previous one. // setDiagWriter swaps the diagnostic writer and returns the previous one.

View file

@ -6,8 +6,8 @@ import (
"context" "context"
"net" "net"
"forge.lthn.ai/core/go-io" "dappco.re/go/core/io"
"forge.lthn.ai/core/go-log" "dappco.re/go/core/log"
) )
// ServeUnix starts a Unix domain socket server for the MCP service. // ServeUnix starts a Unix domain socket server for the MCP service.