refactor(agentic): centralise remote MCP client
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
8c2884cc6f
commit
2d3ec1e8c3
4 changed files with 161 additions and 111 deletions
|
|
@ -49,9 +49,7 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque
|
|||
return nil, RemoteDispatchOutput{}, core.E("dispatchRemote", "task is required", nil)
|
||||
}
|
||||
|
||||
addr := resolveHost(input.Host)
|
||||
|
||||
token := remoteToken(input.Host)
|
||||
client := NewRemoteClient(input.Host)
|
||||
|
||||
callParams := map[string]any{
|
||||
"repo": input.Repo,
|
||||
|
|
@ -73,59 +71,6 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque
|
|||
callParams["variables"] = input.Variables
|
||||
}
|
||||
|
||||
rpcRequest := map[string]any{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "tools/call",
|
||||
"params": map[string]any{
|
||||
"name": "agentic_dispatch",
|
||||
"arguments": callParams,
|
||||
},
|
||||
}
|
||||
body := []byte(core.JSONMarshalString(rpcRequest))
|
||||
|
||||
url := core.Sprintf("http://%s/mcp", addr)
|
||||
|
||||
sessionResult := mcpInitializeResult(ctx, url, token)
|
||||
if !sessionResult.OK {
|
||||
err, _ := sessionResult.Value.(error)
|
||||
if err == nil {
|
||||
err = core.E("dispatchRemote", "MCP initialize failed", nil)
|
||||
}
|
||||
return nil, RemoteDispatchOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Sprintf("init failed: %v", err),
|
||||
}, core.E("dispatchRemote", "MCP initialize failed", err)
|
||||
}
|
||||
sessionID, ok := sessionResult.Value.(string)
|
||||
if !ok || sessionID == "" {
|
||||
err := core.E("dispatchRemote", "invalid session id", nil)
|
||||
return nil, RemoteDispatchOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Sprintf("init failed: %v", err),
|
||||
}, err
|
||||
}
|
||||
|
||||
callResult := mcpCallResult(ctx, url, token, sessionID, body)
|
||||
if !callResult.OK {
|
||||
err, _ := callResult.Value.(error)
|
||||
if err == nil {
|
||||
err = core.E("dispatchRemote", "tool call failed", nil)
|
||||
}
|
||||
return nil, RemoteDispatchOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Sprintf("call failed: %v", err),
|
||||
}, core.E("dispatchRemote", "tool call failed", err)
|
||||
}
|
||||
result, ok := callResult.Value.([]byte)
|
||||
if !ok {
|
||||
err := core.E("dispatchRemote", "invalid tool response", nil)
|
||||
return nil, RemoteDispatchOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Sprintf("call failed: %v", err),
|
||||
}, err
|
||||
}
|
||||
|
||||
output := RemoteDispatchOutput{
|
||||
Success: true,
|
||||
Host: input.Host,
|
||||
|
|
@ -133,6 +78,22 @@ func (s *PrepSubsystem) dispatchRemote(ctx context.Context, _ *mcp.CallToolReque
|
|||
Agent: input.Agent,
|
||||
}
|
||||
|
||||
sessionID, err := client.Initialize(ctx)
|
||||
if err != nil {
|
||||
return nil, RemoteDispatchOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Sprintf("init failed: %v", err),
|
||||
}, core.E("dispatchRemote", "MCP initialize failed", err)
|
||||
}
|
||||
|
||||
result, err := client.Call(ctx, sessionID, client.ToolCallBody(1, "agentic_dispatch", callParams))
|
||||
if err != nil {
|
||||
return nil, RemoteDispatchOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Sprintf("call failed: %v", err),
|
||||
}, core.E("dispatchRemote", "tool call failed", err)
|
||||
}
|
||||
|
||||
var rpcResponse struct {
|
||||
Result struct {
|
||||
Content []struct {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,77 @@
|
|||
// SPDX-License-Identifier: EUPL-1.2
|
||||
|
||||
package agentic
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
core "dappco.re/go/core"
|
||||
)
|
||||
|
||||
// client := agentic.NewRemoteClient("charon")
|
||||
// core.Println(client.URL) // "http://10.69.69.165:9101/mcp"
|
||||
type RemoteClient struct {
|
||||
Host string
|
||||
Address string
|
||||
Token string
|
||||
URL string
|
||||
}
|
||||
|
||||
// client := agentic.NewRemoteClient("charon")
|
||||
func NewRemoteClient(host string) RemoteClient {
|
||||
address := resolveHost(host)
|
||||
return RemoteClient{
|
||||
Host: host,
|
||||
Address: address,
|
||||
Token: remoteToken(host),
|
||||
URL: core.Sprintf("http://%s/mcp", address),
|
||||
}
|
||||
}
|
||||
|
||||
// sessionID, err := client.Initialize(context.Background())
|
||||
func (c RemoteClient) Initialize(ctx context.Context) (string, error) {
|
||||
result := mcpInitializeResult(ctx, c.URL, c.Token)
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
if err == nil {
|
||||
err = core.E("remoteClient.Initialize", "MCP initialise failed", nil)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
sessionID, ok := result.Value.(string)
|
||||
if !ok || sessionID == "" {
|
||||
return "", core.E("remoteClient.Initialize", "invalid session id", nil)
|
||||
}
|
||||
return sessionID, nil
|
||||
}
|
||||
|
||||
// response, err := client.Call(context.Background(), "session-1", core.JSONMarshalString(map[string]any{"method":"tools/call"}))
|
||||
func (c RemoteClient) Call(ctx context.Context, sessionID string, body []byte) ([]byte, error) {
|
||||
result := mcpCallResult(ctx, c.URL, c.Token, sessionID, body)
|
||||
if !result.OK {
|
||||
err, _ := result.Value.(error)
|
||||
if err == nil {
|
||||
err = core.E("remoteClient.Call", "tool call failed", nil)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
response, ok := result.Value.([]byte)
|
||||
if !ok {
|
||||
return nil, core.E("remoteClient.Call", "invalid response payload", nil)
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// body := client.ToolCallBody("agentic_status", map[string]any{})
|
||||
func (c RemoteClient) ToolCallBody(id int, name string, arguments map[string]any) []byte {
|
||||
request := map[string]any{
|
||||
"jsonrpc": "2.0",
|
||||
"id": id,
|
||||
"method": "tools/call",
|
||||
"params": map[string]any{
|
||||
"name": name,
|
||||
"arguments": arguments,
|
||||
},
|
||||
}
|
||||
return []byte(core.JSONMarshalString(request))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -283,3 +283,55 @@ func TestRemoteclient_DrainSSE_Ugly_VeryLargeResponse(t *testing.T) {
|
|||
// Should drain all lines without panic
|
||||
assert.NotPanics(t, func() { drainSSE(resp) })
|
||||
}
|
||||
|
||||
// --- RemoteClient ---
|
||||
|
||||
func TestRemoteClient_NewRemoteClient_Good(t *testing.T) {
|
||||
t.Setenv("AGENT_TOKEN_CHARON", "token-123")
|
||||
|
||||
client := NewRemoteClient("charon")
|
||||
|
||||
assert.Equal(t, "charon", client.Host)
|
||||
assert.Equal(t, "10.69.69.165:9101", client.Address)
|
||||
assert.Equal(t, "token-123", client.Token)
|
||||
assert.Equal(t, "http://10.69.69.165:9101/mcp", client.URL)
|
||||
}
|
||||
|
||||
func TestRemoteClient_ToolCallBody_Good(t *testing.T) {
|
||||
client := NewRemoteClient("local")
|
||||
|
||||
body := client.ToolCallBody(7, "agentic_status", map[string]any{
|
||||
"workspace": "core/go-io/task-5",
|
||||
})
|
||||
|
||||
var payload map[string]any
|
||||
result := core.JSONUnmarshal(body, &payload)
|
||||
require.True(t, result.OK)
|
||||
assert.Equal(t, "2.0", payload["jsonrpc"])
|
||||
assert.Equal(t, float64(7), payload["id"])
|
||||
assert.Equal(t, "tools/call", payload["method"])
|
||||
params, ok := payload["params"].(map[string]any)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "agentic_status", params["name"])
|
||||
}
|
||||
|
||||
func TestRemoteClient_NewRemoteClient_Bad(t *testing.T) {
|
||||
client := NewRemoteClient("")
|
||||
|
||||
assert.Equal(t, ":9101", client.Address)
|
||||
assert.Equal(t, "http://:9101/mcp", client.URL)
|
||||
}
|
||||
|
||||
func TestRemoteClient_ToolCallBody_Ugly(t *testing.T) {
|
||||
client := NewRemoteClient("my-host.local")
|
||||
|
||||
body := client.ToolCallBody(0, "", nil)
|
||||
|
||||
var payload map[string]any
|
||||
result := core.JSONUnmarshal(body, &payload)
|
||||
require.True(t, result.OK)
|
||||
params, ok := payload["params"].(map[string]any)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, "", params["name"])
|
||||
assert.Nil(t, params["arguments"])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,66 +31,29 @@ func (s *PrepSubsystem) statusRemote(ctx context.Context, _ *mcp.CallToolRequest
|
|||
return nil, RemoteStatusOutput{}, core.E("statusRemote", "host is required", nil)
|
||||
}
|
||||
|
||||
addr := resolveHost(input.Host)
|
||||
token := remoteToken(input.Host)
|
||||
url := core.Concat("http://", addr, "/mcp")
|
||||
|
||||
sessionResult := mcpInitializeResult(ctx, url, token)
|
||||
if !sessionResult.OK {
|
||||
err, _ := sessionResult.Value.(error)
|
||||
if err == nil {
|
||||
err = core.E("statusRemote", "MCP initialize failed", nil)
|
||||
}
|
||||
return nil, RemoteStatusOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Concat("unreachable: ", err.Error()),
|
||||
}, nil
|
||||
}
|
||||
sessionID, ok := sessionResult.Value.(string)
|
||||
if !ok || sessionID == "" {
|
||||
err := core.E("statusRemote", "invalid session id", nil)
|
||||
return nil, RemoteStatusOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Concat("unreachable: ", err.Error()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
rpcRequest := map[string]any{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 2,
|
||||
"method": "tools/call",
|
||||
"params": map[string]any{
|
||||
"name": "agentic_status",
|
||||
"arguments": map[string]any{},
|
||||
},
|
||||
}
|
||||
body := []byte(core.JSONMarshalString(rpcRequest))
|
||||
|
||||
callResult := mcpCallResult(ctx, url, token, sessionID, body)
|
||||
if !callResult.OK {
|
||||
err, _ := callResult.Value.(error)
|
||||
if err == nil {
|
||||
err = core.E("statusRemote", "tool call failed", nil)
|
||||
}
|
||||
return nil, RemoteStatusOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Concat("call failed: ", err.Error()),
|
||||
}, nil
|
||||
}
|
||||
result, ok := callResult.Value.([]byte)
|
||||
if !ok {
|
||||
err := core.E("statusRemote", "invalid tool response", nil)
|
||||
return nil, RemoteStatusOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Concat("call failed: ", err.Error()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
output := RemoteStatusOutput{
|
||||
Success: true,
|
||||
Host: input.Host,
|
||||
}
|
||||
|
||||
client := NewRemoteClient(input.Host)
|
||||
|
||||
sessionID, err := client.Initialize(ctx)
|
||||
if err != nil {
|
||||
return nil, RemoteStatusOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Concat("unreachable: ", err.Error()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
result, err := client.Call(ctx, sessionID, client.ToolCallBody(2, "agentic_status", map[string]any{}))
|
||||
if err != nil {
|
||||
return nil, RemoteStatusOutput{
|
||||
Host: input.Host,
|
||||
Error: core.Concat("call failed: ", err.Error()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
var rpcResponse struct {
|
||||
Result struct {
|
||||
Content []struct {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue