325 lines
10 KiB
Go
325 lines
10 KiB
Go
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
package agentic
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"time"
|
|
|
|
core "dappco.re/go/core"
|
|
)
|
|
|
|
var defaultClient = &http.Client{Timeout: 30 * time.Second}
|
|
|
|
type httpStream struct {
|
|
client *http.Client
|
|
url string
|
|
token string
|
|
method string
|
|
response []byte
|
|
}
|
|
|
|
// stream := &httpStream{client: defaultClient, url: "https://api.lthn.sh/v1/health", method: "POST"}
|
|
// _ = stream.Send([]byte(`{"ping":1}`))
|
|
func (s *httpStream) Send(data []byte) error {
|
|
request, err := http.NewRequestWithContext(context.Background(), s.method, s.url, core.NewReader(string(data)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
request.Header.Set("Content-Type", "application/json")
|
|
request.Header.Set("Accept", "application/json")
|
|
if s.token != "" {
|
|
request.Header.Set("Authorization", core.Concat("token ", s.token))
|
|
}
|
|
|
|
response, err := s.client.Do(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer response.Body.Close()
|
|
|
|
readResult := core.ReadAll(response.Body)
|
|
if !readResult.OK {
|
|
err, _ := readResult.Value.(error)
|
|
return core.E("httpStream.Send", "failed to read response", err)
|
|
}
|
|
s.response = []byte(readResult.Value.(string))
|
|
return nil
|
|
}
|
|
|
|
// stream := &httpStream{}
|
|
// response, err := stream.Receive()
|
|
func (s *httpStream) Receive() ([]byte, error) {
|
|
return s.response, nil
|
|
}
|
|
|
|
// stream := &httpStream{}
|
|
// _ = stream.Close()
|
|
func (s *httpStream) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// agentic.RegisterHTTPTransport(c)
|
|
func RegisterHTTPTransport(c *core.Core) {
|
|
factory := func(handle *core.DriveHandle) (core.Stream, error) {
|
|
token := handle.Options.String("token")
|
|
return &httpStream{
|
|
client: defaultClient,
|
|
url: handle.Transport,
|
|
token: token,
|
|
method: "POST",
|
|
}, nil
|
|
}
|
|
c.API().RegisterProtocol("http", factory)
|
|
c.API().RegisterProtocol("https", factory)
|
|
}
|
|
|
|
// result := agentic.HTTPGet(ctx, "https://forge.lthn.ai/api/v1/repos", "my-token", "token")
|
|
func HTTPGet(ctx context.Context, url, token, authScheme string) core.Result {
|
|
return httpDo(ctx, "GET", url, "", token, authScheme)
|
|
}
|
|
|
|
// result := agentic.HTTPPost(ctx, url, core.JSONMarshalString(payload), token, "token")
|
|
func HTTPPost(ctx context.Context, url, body, token, authScheme string) core.Result {
|
|
return httpDo(ctx, "POST", url, body, token, authScheme)
|
|
}
|
|
|
|
// result := agentic.HTTPPatch(ctx, url, body, token, "token")
|
|
func HTTPPatch(ctx context.Context, url, body, token, authScheme string) core.Result {
|
|
return httpDo(ctx, "PATCH", url, body, token, authScheme)
|
|
}
|
|
|
|
// result := agentic.HTTPDelete(ctx, url, body, token, "Bearer")
|
|
func HTTPDelete(ctx context.Context, url, body, token, authScheme string) core.Result {
|
|
return httpDo(ctx, "DELETE", url, body, token, authScheme)
|
|
}
|
|
|
|
// result := agentic.HTTPDo(ctx, "PUT", url, body, token, "token")
|
|
func HTTPDo(ctx context.Context, method, url, body, token, authScheme string) core.Result {
|
|
return httpDo(ctx, method, url, body, token, authScheme)
|
|
}
|
|
|
|
// result := DriveGet(c, "forge", "/api/v1/repos/core/go-io", "token")
|
|
func DriveGet(c *core.Core, drive, path, authScheme string) core.Result {
|
|
base, token := driveEndpoint(c, drive)
|
|
if base == "" {
|
|
return core.Result{Value: core.E("DriveGet", core.Concat("drive not found: ", drive), nil), OK: false}
|
|
}
|
|
return httpDo(context.Background(), "GET", core.Concat(base, path), "", token, authScheme)
|
|
}
|
|
|
|
// result := DrivePost(c, "forge", "/api/v1/repos/core/go-io/issues", body, "token")
|
|
func DrivePost(c *core.Core, drive, path, body, authScheme string) core.Result {
|
|
base, token := driveEndpoint(c, drive)
|
|
if base == "" {
|
|
return core.Result{Value: core.E("DrivePost", core.Concat("drive not found: ", drive), nil), OK: false}
|
|
}
|
|
return httpDo(context.Background(), "POST", core.Concat(base, path), body, token, authScheme)
|
|
}
|
|
|
|
// result := DriveDo(c, "forge", "PATCH", "/api/v1/repos/core/go-io/pulls/5", body, "token")
|
|
func DriveDo(c *core.Core, drive, method, path, body, authScheme string) core.Result {
|
|
base, token := driveEndpoint(c, drive)
|
|
if base == "" {
|
|
return core.Result{Value: core.E("DriveDo", core.Concat("drive not found: ", drive), nil), OK: false}
|
|
}
|
|
return httpDo(context.Background(), method, core.Concat(base, path), body, token, authScheme)
|
|
}
|
|
|
|
func driveEndpoint(c *core.Core, name string) (base, token string) {
|
|
driveResult := c.Drive().Get(name)
|
|
if !driveResult.OK {
|
|
return "", ""
|
|
}
|
|
driveHandle := driveResult.Value.(*core.DriveHandle)
|
|
return driveHandle.Transport, driveHandle.Options.String("token")
|
|
}
|
|
|
|
func httpDo(ctx context.Context, method, url, body, token, authScheme string) core.Result {
|
|
var request *http.Request
|
|
var requestErr error
|
|
|
|
if body != "" {
|
|
request, requestErr = http.NewRequestWithContext(ctx, method, url, core.NewReader(body))
|
|
} else {
|
|
request, requestErr = http.NewRequestWithContext(ctx, method, url, nil)
|
|
}
|
|
if requestErr != nil {
|
|
return core.Result{Value: core.E("httpDo", "create request", requestErr), OK: false}
|
|
}
|
|
|
|
request.Header.Set("Content-Type", "application/json")
|
|
request.Header.Set("Accept", "application/json")
|
|
if token != "" {
|
|
if authScheme == "" {
|
|
authScheme = "token"
|
|
}
|
|
request.Header.Set("Authorization", core.Concat(authScheme, " ", token))
|
|
}
|
|
|
|
response, requestErr := defaultClient.Do(request)
|
|
if requestErr != nil {
|
|
return core.Result{Value: core.E("httpDo", "request failed", requestErr), OK: false}
|
|
}
|
|
defer response.Body.Close()
|
|
|
|
readResult := core.ReadAll(response.Body)
|
|
if !readResult.OK {
|
|
readErr, _ := readResult.Value.(error)
|
|
return core.Result{Value: core.E("httpDo", "failed to read response", readErr), OK: false}
|
|
}
|
|
|
|
return core.Result{Value: readResult.Value.(string), OK: response.StatusCode < 400}
|
|
}
|
|
|
|
// sessionID, err := mcpInitialize(ctx, url, token)
|
|
func mcpInitialize(ctx context.Context, url, token string) (string, error) {
|
|
result := mcpInitializeResult(ctx, url, token)
|
|
if !result.OK {
|
|
err, _ := result.Value.(error)
|
|
if err == nil {
|
|
return "", core.E("mcpInitialize", "failed", nil)
|
|
}
|
|
return "", err
|
|
}
|
|
sessionID, ok := result.Value.(string)
|
|
if !ok {
|
|
return "", core.E("mcpInitialize", "invalid session id result", nil)
|
|
}
|
|
return sessionID, nil
|
|
}
|
|
|
|
func mcpInitializeResult(ctx context.Context, url, token string) core.Result {
|
|
initializeRequest := map[string]any{
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "initialize",
|
|
"params": map[string]any{
|
|
"protocolVersion": "2025-03-26",
|
|
"capabilities": map[string]any{},
|
|
"clientInfo": map[string]any{
|
|
"name": "core-agent-remote",
|
|
"version": "0.2.0",
|
|
},
|
|
},
|
|
}
|
|
|
|
body := core.JSONMarshalString(initializeRequest)
|
|
request, err := http.NewRequestWithContext(ctx, "POST", url, core.NewReader(body))
|
|
if err != nil {
|
|
return core.Result{Value: core.E("mcpInitialize", "create request", err), OK: false}
|
|
}
|
|
mcpHeaders(request, token, "")
|
|
|
|
response, err := defaultClient.Do(request)
|
|
if err != nil {
|
|
return core.Result{Value: core.E("mcpInitialize", "request failed", err), OK: false}
|
|
}
|
|
defer response.Body.Close()
|
|
|
|
if response.StatusCode != 200 {
|
|
return core.Result{Value: core.E("mcpInitialize", core.Sprintf("HTTP %d", response.StatusCode), nil), OK: false}
|
|
}
|
|
|
|
sessionID := response.Header.Get("Mcp-Session-Id")
|
|
|
|
drainSSE(response)
|
|
|
|
notification := core.JSONMarshalString(map[string]any{
|
|
"jsonrpc": "2.0",
|
|
"method": "notifications/initialized",
|
|
})
|
|
notificationRequest, err := http.NewRequestWithContext(ctx, "POST", url, core.NewReader(notification))
|
|
if err != nil {
|
|
return core.Result{Value: core.E("mcpInitialize", "create notification request", err), OK: false}
|
|
}
|
|
mcpHeaders(notificationRequest, token, sessionID)
|
|
notificationResponse, err := defaultClient.Do(notificationRequest)
|
|
if err == nil {
|
|
notificationResponse.Body.Close()
|
|
}
|
|
|
|
return core.Result{Value: sessionID, OK: true}
|
|
}
|
|
|
|
func mcpCall(ctx context.Context, url, token, sessionID string, body []byte) ([]byte, error) {
|
|
result := mcpCallResult(ctx, url, token, sessionID, body)
|
|
if !result.OK {
|
|
err, _ := result.Value.(error)
|
|
if err == nil {
|
|
return nil, core.E("mcpCall", "failed", nil)
|
|
}
|
|
return nil, err
|
|
}
|
|
data, ok := result.Value.([]byte)
|
|
if !ok {
|
|
return nil, core.E("mcpCall", "invalid call result", nil)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
func mcpCallResult(ctx context.Context, url, token, sessionID string, body []byte) core.Result {
|
|
request, err := http.NewRequestWithContext(ctx, "POST", url, core.NewReader(string(body)))
|
|
if err != nil {
|
|
return core.Result{Value: core.E("mcpCall", "create request", err), OK: false}
|
|
}
|
|
mcpHeaders(request, token, sessionID)
|
|
|
|
response, err := defaultClient.Do(request)
|
|
if err != nil {
|
|
return core.Result{Value: core.E("mcpCall", "request failed", err), OK: false}
|
|
}
|
|
defer response.Body.Close()
|
|
|
|
if response.StatusCode != 200 {
|
|
return core.Result{Value: core.E("mcpCall", core.Sprintf("HTTP %d", response.StatusCode), nil), OK: false}
|
|
}
|
|
|
|
return readSSEDataResult(response)
|
|
}
|
|
|
|
func readSSEData(response *http.Response) ([]byte, error) {
|
|
result := readSSEDataResult(response)
|
|
if !result.OK {
|
|
err, _ := result.Value.(error)
|
|
if err == nil {
|
|
return nil, core.E("readSSEData", "failed", nil)
|
|
}
|
|
return nil, err
|
|
}
|
|
data, ok := result.Value.([]byte)
|
|
if !ok {
|
|
return nil, core.E("readSSEData", "invalid data result", nil)
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
func readSSEDataResult(response *http.Response) core.Result {
|
|
readResult := core.ReadAll(response.Body)
|
|
if !readResult.OK {
|
|
err, _ := readResult.Value.(error)
|
|
return core.Result{Value: core.E("readSSEData", "failed to read response", err), OK: false}
|
|
}
|
|
for _, line := range core.Split(readResult.Value.(string), "\n") {
|
|
if core.HasPrefix(line, "data: ") {
|
|
return core.Result{Value: []byte(core.TrimPrefix(line, "data: ")), OK: true}
|
|
}
|
|
}
|
|
return core.Result{Value: core.E("readSSEData", "no data in SSE response", nil), OK: false}
|
|
}
|
|
|
|
func mcpHeaders(request *http.Request, token, sessionID string) {
|
|
request.Header.Set("Content-Type", "application/json")
|
|
request.Header.Set("Accept", "application/json, text/event-stream")
|
|
if token != "" {
|
|
request.Header.Set("Authorization", core.Concat("Bearer ", token))
|
|
}
|
|
if sessionID != "" {
|
|
request.Header.Set("Mcp-Session-Id", sessionID)
|
|
}
|
|
}
|
|
|
|
func drainSSE(response *http.Response) {
|
|
core.ReadAll(response.Body)
|
|
}
|