From 0fb1ef5cfb60fdcbabe716c6b25fa22cff58df5a Mon Sep 17 00:00:00 2001 From: Snider Date: Wed, 15 Apr 2026 22:12:19 +0100 Subject: [PATCH] Implement P2P, Deno, and container spec gaps --- pkg/container/service.go | 66 ++++++ pkg/deno/sidecar.go | 263 ++++++++++++++++++++- pkg/display/display.go | 15 +- pkg/display/p2p.go | 30 +++ pkg/display/scheme.go | 41 +++- pkg/display/sidecar.go | 56 +++++ pkg/mcp/subsystem.go | 3 + pkg/mcp/tools_container.go | 97 ++++++++ pkg/mcp/tools_deno.go | 103 ++++++++ pkg/mcp/tools_p2p.go | 69 ++++++ pkg/p2p/service.go | 123 ++++++++++ pkg/p2p/tcp.go | 147 ++++++++++++ ui/src/chat/chat-panel.component.ts | 87 +++++-- ui/src/main.ts | 3 + ui/src/web-components/chat-hero.element.ts | 76 ++++++ 15 files changed, 1150 insertions(+), 29 deletions(-) create mode 100644 pkg/container/service.go create mode 100644 pkg/display/p2p.go create mode 100644 pkg/mcp/tools_container.go create mode 100644 pkg/mcp/tools_deno.go create mode 100644 pkg/mcp/tools_p2p.go create mode 100644 pkg/p2p/service.go create mode 100644 pkg/p2p/tcp.go create mode 100644 ui/src/web-components/chat-hero.element.ts diff --git a/pkg/container/service.go b/pkg/container/service.go new file mode 100644 index 00000000..02d7fea5 --- /dev/null +++ b/pkg/container/service.go @@ -0,0 +1,66 @@ +package container + +import ( + "context" + "strings" + + core "dappco.re/go/core" +) + +type Service struct { + *core.ServiceRuntime[TIMOptions] + manager *TIMManager +} + +func NewService(c *core.Core, options TIMOptions) *Service { + return &Service{ + ServiceRuntime: core.NewServiceRuntime(c, options), + manager: NewTIMManager(options), + } +} + +func OptionsFromEnv() TIMOptions { + return TIMOptions{ + Name: strings.TrimSpace(core.Env("CORE_TIM_NAME")), + Image: strings.TrimSpace(core.Env("CORE_TIM_IMAGE")), + Command: splitCSV(strings.TrimSpace(core.Env("CORE_TIM_COMMAND"))), + DataDir: strings.TrimSpace(core.Env("CORE_TIM_DATA_DIR")), + } +} + +func (s *Service) OnStartup(_ context.Context) core.Result { + s.Core().Action("container.runtime.detect", func(_ context.Context, _ core.Options) core.Result { + return core.Result{Value: Detect(), OK: true} + }) + s.Core().Action("tim.start", func(ctx context.Context, _ core.Options) core.Result { + state, err := s.manager.Start(ctx) + return core.Result{}.New(state, err) + }) + s.Core().Action("tim.stop", func(ctx context.Context, _ core.Options) core.Result { + state, err := s.manager.Stop(ctx) + return core.Result{}.New(state, err) + }) + s.Core().Action("tim.status", func(_ context.Context, _ core.Options) core.Result { + return core.Result{Value: s.manager.State(), OK: true} + }) + return core.Result{OK: true} +} + +func (s *Service) State() TIMState { + return s.manager.State() +} + +func splitCSV(value string) []string { + if strings.TrimSpace(value) == "" { + return nil + } + parts := strings.Split(value, ",") + result := make([]string, 0, len(parts)) + for _, part := range parts { + part = strings.TrimSpace(part) + if part != "" { + result = append(result, part) + } + } + return result +} diff --git a/pkg/deno/sidecar.go b/pkg/deno/sidecar.go index 02d5cadc..c45991d1 100644 --- a/pkg/deno/sidecar.go +++ b/pkg/deno/sidecar.go @@ -1,13 +1,20 @@ package deno import ( + "bufio" "context" + "encoding/json" "errors" + "fmt" + "io" "os" "os/exec" "strings" "sync" + "sync/atomic" "syscall" + + core "dappco.re/go/core" ) type Options struct { @@ -15,25 +22,46 @@ type Options struct { Args []string Dir string Env []string + Core *core.Core } type Status struct { - Running bool `json:"running"` - PID int `json:"pid,omitempty"` - Binary string `json:"binary,omitempty"` + Running bool `json:"running"` + Connected bool `json:"connected"` + PID int `json:"pid,omitempty"` + Binary string `json:"binary,omitempty"` +} + +type EvalResult struct { + Value any `json:"value,omitempty"` +} + +type Event struct { + Name string `json:"name"` + Data any `json:"data,omitempty"` } type Manager struct { options Options mu sync.Mutex cmd *exec.Cmd + stdin io.WriteCloser + pending map[string]chan rpcMessage + events []func(Event) + nextID atomic.Uint64 } func New(options Options) *Manager { if strings.TrimSpace(options.Binary) == "" { options.Binary = "deno" } - return &Manager{options: options} + if len(options.Args) == 0 { + options.Args = []string{"eval", denoBridgeProgram} + } + return &Manager{ + options: options, + pending: make(map[string]chan rpcMessage), + } } func (m *Manager) Start(ctx context.Context) (Status, error) { @@ -46,10 +74,22 @@ func (m *Manager) Start(ctx context.Context) (Status, error) { cmd := exec.CommandContext(ctx, m.options.Binary, m.options.Args...) cmd.Dir = m.options.Dir cmd.Env = append(os.Environ(), m.options.Env...) + + stdin, err := cmd.StdinPipe() + if err != nil { + return Status{}, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return Status{}, err + } if err := cmd.Start(); err != nil { return Status{}, err } + m.cmd = cmd + m.stdin = stdin + go m.readLoop(stdout) return m.statusLocked(), nil } @@ -64,6 +104,11 @@ func (m *Manager) Stop(context.Context) (Status, error) { return m.statusLocked(), err } m.cmd = nil + m.stdin = nil + for id, ch := range m.pending { + close(ch) + delete(m.pending, id) + } return Status{}, nil } @@ -79,7 +124,217 @@ func (m *Manager) statusLocked() Status { } if m.cmd != nil && m.cmd.Process != nil { status.Running = true + status.Connected = m.stdin != nil status.PID = m.cmd.Process.Pid } return status } + +func (m *Manager) OnEvent(handler func(Event)) { + if handler == nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, handler) +} + +func (m *Manager) Eval(ctx context.Context, code string) (EvalResult, error) { + response, err := m.request(ctx, rpcMessage{Type: "eval", Code: code}) + if err != nil { + return EvalResult{}, err + } + return EvalResult{Value: response.Result}, nil +} + +func (m *Manager) Emit(name string, data any) error { + if strings.TrimSpace(name) == "" { + return errors.New("event name is required") + } + return m.send(rpcMessage{Type: "event", Name: name, Data: data}) +} + +func (m *Manager) request(ctx context.Context, message rpcMessage) (rpcMessage, error) { + m.mu.Lock() + if m.stdin == nil { + m.mu.Unlock() + return rpcMessage{}, errors.New("deno sidecar is not running") + } + message.ID = fmt.Sprintf("deno-%d", m.nextID.Add(1)) + responseCh := make(chan rpcMessage, 1) + m.pending[message.ID] = responseCh + payload, err := json.Marshal(message) + if err != nil { + delete(m.pending, message.ID) + m.mu.Unlock() + return rpcMessage{}, err + } + _, err = m.stdin.Write(append(payload, '\n')) + m.mu.Unlock() + if err != nil { + return rpcMessage{}, err + } + select { + case <-ctx.Done(): + return rpcMessage{}, ctx.Err() + case response, ok := <-responseCh: + if !ok { + return rpcMessage{}, errors.New("deno sidecar disconnected") + } + if !response.OK { + return rpcMessage{}, errors.New(strings.TrimSpace(response.Error)) + } + return response, nil + } +} + +func (m *Manager) readLoop(stdout io.Reader) { + scanner := bufio.NewScanner(stdout) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + var message rpcMessage + if err := json.Unmarshal(scanner.Bytes(), &message); err != nil { + continue + } + m.handleMessage(message) + } +} + +func (m *Manager) handleMessage(message rpcMessage) { + switch message.Type { + case "result": + m.mu.Lock() + ch := m.pending[message.ID] + delete(m.pending, message.ID) + m.mu.Unlock() + if ch != nil { + ch <- message + close(ch) + } + case "event": + m.mu.Lock() + handlers := append([]func(Event){}, m.events...) + m.mu.Unlock() + for _, handler := range handlers { + handler(Event{Name: message.Name, Data: message.Data}) + } + case "action": + m.handleAction(message) + } +} + +func (m *Manager) handleAction(message rpcMessage) { + response := rpcMessage{Type: "result", ID: message.ID} + if m.options.Core == nil { + response.Error = "core is unavailable" + _ = m.send(response) + return + } + opts := core.NewOptions() + for key, value := range message.Options { + opts.Set(key, value) + } + result := m.options.Core.Action(message.Name).Run(context.Background(), opts) + response.OK = result.OK + if result.OK { + response.Result = result.Value + } else if err, ok := result.Value.(error); ok { + response.Error = err.Error() + } else { + response.Error = fmt.Sprint(result.Value) + } + _ = m.send(response) +} + +func (m *Manager) send(message rpcMessage) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.stdin == nil { + return errors.New("deno sidecar is not running") + } + payload, err := json.Marshal(message) + if err != nil { + return err + } + _, err = m.stdin.Write(append(payload, '\n')) + return err +} + +type rpcMessage struct { + Type string `json:"type"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Code string `json:"code,omitempty"` + Data any `json:"data,omitempty"` + Options map[string]any `json:"options,omitempty"` + OK bool `json:"ok,omitempty"` + Result any `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +const denoBridgeProgram = `const encoder = new TextEncoder(); +const decoder = new TextDecoder(); +globalThis.core = { + emit(name, data) { + return send({ type: "event", name, data }); + }, + action(name, options = {}) { + return request({ type: "action", name, options }); + }, +}; +const pending = new Map(); +async function send(message) { + await Deno.stdout.write(encoder.encode(JSON.stringify(message) + "\n")); +} +function request(message) { + const id = crypto.randomUUID(); + return new Promise((resolve, reject) => { + pending.set(id, { resolve, reject }); + send({ ...message, id }).catch((error) => { + pending.delete(id); + reject(error); + }); + }); +} +async function handle(message) { + if (message.type === "eval") { + try { + const value = await (0, eval)(message.code); + await send({ type: "result", id: message.id, ok: true, result: value }); + } catch (error) { + await send({ type: "result", id: message.id, ok: false, error: String(error?.stack ?? error) }); + } + return; + } + if (message.type === "event") { + globalThis.dispatchEvent(new CustomEvent(message.name || "core.event", { detail: message.data ?? null })); + await send({ type: "result", id: message.id, ok: true, result: null }); + return; + } + if (message.type === "result") { + const pendingRequest = pending.get(message.id); + if (!pendingRequest) return; + pending.delete(message.id); + if (message.ok) { + pendingRequest.resolve(message.result); + } else { + pendingRequest.reject(new Error(message.error || "deno sidecar request failed")); + } + } +} +let buffer = ""; +while (true) { + const chunk = new Uint8Array(4096); + const read = await Deno.stdin.read(chunk); + if (read === null) break; + buffer += decoder.decode(chunk.subarray(0, read)); + let newline = buffer.indexOf("\n"); + while (newline >= 0) { + const line = buffer.slice(0, newline).trim(); + buffer = buffer.slice(newline + 1); + if (line) { + await handle(JSON.parse(line)); + } + newline = buffer.indexOf("\n"); + } +}` diff --git a/pkg/display/display.go b/pkg/display/display.go index 36064cb0..91dc69bd 100644 --- a/pkg/display/display.go +++ b/pkg/display/display.go @@ -11,8 +11,8 @@ import ( "forge.lthn.ai/core/config" "forge.lthn.ai/core/gui/pkg/chat" - "forge.lthn.ai/core/gui/pkg/container" "forge.lthn.ai/core/gui/pkg/clipboard" + "forge.lthn.ai/core/gui/pkg/container" "forge.lthn.ai/core/gui/pkg/contextmenu" "forge.lthn.ai/core/gui/pkg/deno" "forge.lthn.ai/core/gui/pkg/dialog" @@ -81,7 +81,16 @@ func Register(wailsApp *application.App) func(*core.Core) core.Result { } s.ServiceRuntime = core.NewServiceRuntime[Options](c, Options{}) s.wailsApp = wailsApp - return core.Result{Value: s, OK: true} + if result := c.RegisterService("display", s); !result.OK { + return result + } + if !c.Service("deno").OK { + _ = c.RegisterService("deno", s.ensureSidecar()) + } + if !c.Service("tim").OK { + _ = c.RegisterService("tim", container.NewService(c, container.OptionsFromEnv())) + } + return core.Result{OK: true} } } @@ -166,6 +175,7 @@ func (s *Service) OnStartup(_ context.Context) core.Result { s.registerMarketplaceActions() s.registerSidecarActions() s.registerDefaultSchemes() + s.attachP2PBridge() // Initialise Wails wrappers if app is available (nil in tests) if s.wailsApp != nil { @@ -189,6 +199,7 @@ func (s *Service) OnShutdown(ctx context.Context) core.Result { // HandleIPCEvents bridges IPC actions from sub-services to WebSocket events for TS apps. func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { + s.forwardIPCToSidecar(msg) switch m := msg.(type) { case core.ActionServiceStartup: // All services have completed OnStartup — safe to call sub-services diff --git a/pkg/display/p2p.go b/pkg/display/p2p.go new file mode 100644 index 00000000..bcd46fd1 --- /dev/null +++ b/pkg/display/p2p.go @@ -0,0 +1,30 @@ +package display + +import ( + "context" + + core "dappco.re/go/core" + "forge.lthn.ai/core/gui/pkg/p2p" +) + +func (s *Service) attachP2PBridge() { + router, ok := core.ServiceFor[*p2p.Service](s.Core(), "p2p") + if !ok || router == nil { + return + } + _ = router.Subscribe(context.Background(), "display", func(envelope p2p.Envelope) { + if s.events == nil { + return + } + s.events.Emit(Event{ + Type: EventCustomEvent, + Data: map[string]any{ + "source": "p2p", + "topic": envelope.Topic, + "route": envelope.Route, + "sender_id": envelope.SenderID, + "payload": envelope.Payload, + }, + }) + }) +} diff --git a/pkg/display/scheme.go b/pkg/display/scheme.go index 0717035f..66132b70 100644 --- a/pkg/display/scheme.go +++ b/pkg/display/scheme.go @@ -389,7 +389,7 @@ func (s *Service) ResolveScheme(ctx context.Context, rawURL string) core.Result } func (s *Service) renderSchemeBody(route string, value any) string { - title := "core://" + route + title := buildCoreURL(route, nil) pretty := core.JSONMarshalString(value) return "" + html.EscapeString(title) + @@ -550,7 +550,7 @@ func (s *Service) handleStoreSearch(_ context.Context, params url.Values) core.R "content_type": "text/html", "body": s.renderStoreSearchPage(query, results), "route": "store", - "url": "core://store", + "url": buildCoreURL("store", nil), "query": params, "results": results, }, @@ -568,16 +568,49 @@ func coalesce(values ...string) string { } func coreRouteURL(segment string, parts ...string) string { - route := "core://" + strings.Trim(strings.TrimSpace(segment), "/") + return buildCoreURL(pathForCoreRoute(segment, parts...), nil) +} + +func buildCoreURL(route string, query url.Values) string { + route = strings.Trim(strings.TrimSpace(route), "/") + if route == "" { + return "core://" + } + built := "core://" + route + if encoded := sanitizeCoreQuery(query).Encode(); encoded != "" { + built += "?" + encoded + } + return built +} + +func pathForCoreRoute(segment string, parts ...string) string { + route := strings.Trim(strings.TrimSpace(segment), "/") for _, part := range parts { if strings.TrimSpace(part) == "" { continue } - route += "/" + url.PathEscape(part) + route += "/" + url.PathEscape(strings.TrimSpace(part)) } return route } +func sanitizeCoreQuery(query url.Values) url.Values { + if len(query) == 0 { + return nil + } + sanitized := make(url.Values, len(query)) + for key, values := range query { + key = strings.TrimSpace(key) + if key == "" { + continue + } + for _, value := range values { + sanitized.Add(key, value) + } + } + return sanitized +} + func safeOriginHref(origin string) string { trimmed := strings.TrimSpace(origin) if trimmed == "" { diff --git a/pkg/display/sidecar.go b/pkg/display/sidecar.go index 755f39b4..5689efac 100644 --- a/pkg/display/sidecar.go +++ b/pkg/display/sidecar.go @@ -2,6 +2,7 @@ package display import ( "context" + "reflect" "strings" core "dappco.re/go/core" @@ -38,14 +39,40 @@ func (s *Service) registerSidecarActions() { s.Core().Action("core.deno.sidecar.status", func(_ context.Context, _ core.Options) core.Result { return core.Result{Value: s.ensureSidecar().Status(), OK: true} }) + s.Core().Action("display.sidecar.eval", func(ctx context.Context, opts core.Options) core.Result { + result, err := s.ensureSidecar().Eval(ctx, opts.String("code")) + return core.Result{}.New(result, err) + }) + s.Core().Action("core.deno.sidecar.eval", func(ctx context.Context, opts core.Options) core.Result { + result, err := s.ensureSidecar().Eval(ctx, opts.String("code")) + return core.Result{}.New(result, err) + }) } func (s *Service) ensureSidecar() *deno.Manager { if s.sidecar == nil { + var coreRef *core.Core + if s != nil && s.ServiceRuntime != nil { + coreRef = s.Core() + } s.sidecar = deno.New(deno.Options{ Binary: strings.TrimSpace(core.Env("CORE_DENO_BINARY")), Dir: strings.TrimSpace(core.Env("CORE_DENO_DIR")), Args: splitCommandArgs(core.Env("CORE_DENO_ARGS")), + Core: coreRef, + }) + s.sidecar.OnEvent(func(event deno.Event) { + if s.events == nil { + return + } + s.events.Emit(Event{ + Type: EventCustomEvent, + Data: map[string]any{ + "source": "deno", + "name": event.Name, + "data": event.Data, + }, + }) }) } return s.sidecar @@ -58,3 +85,32 @@ func splitCommandArgs(value string) []string { } return fields } + +func (s *Service) forwardIPCToSidecar(msg core.Message) { + if s == nil || s.sidecar == nil { + return + } + status := s.sidecar.Status() + if !status.Running || !status.Connected { + return + } + typeName := "" + if t := reflect.TypeOf(msg); t != nil { + typeName = t.String() + } + _ = s.sidecar.Emit("core.ipc.message", map[string]any{ + "type": typeName, + "data": normalizeSidecarValue(msg), + }) +} + +func normalizeSidecarValue(value any) any { + if value == nil { + return nil + } + var normalized any + if result := core.JSONUnmarshalString(core.JSONMarshalString(value), &normalized); result.OK { + return normalized + } + return map[string]any{"value": core.JSONMarshalString(value)} +} diff --git a/pkg/mcp/subsystem.go b/pkg/mcp/subsystem.go index fe1db767..73185150 100644 --- a/pkg/mcp/subsystem.go +++ b/pkg/mcp/subsystem.go @@ -63,6 +63,9 @@ func (s *Subsystem) RegisterTools(server *mcp.Server) { s.registerMarketplaceTools(server) s.registerEventsTools(server) s.registerMenuTools(server) + s.registerP2PTools(server) + s.registerDenoTools(server) + s.registerContainerTools(server) } // Manifest returns the recorded MCP tool metadata in stable name order. diff --git a/pkg/mcp/tools_container.go b/pkg/mcp/tools_container.go new file mode 100644 index 00000000..95373685 --- /dev/null +++ b/pkg/mcp/tools_container.go @@ -0,0 +1,97 @@ +package mcp + +import ( + "context" + + core "dappco.re/go/core" + coreerr "dappco.re/go/core/log" + "forge.lthn.ai/core/gui/pkg/container" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +type ContainerDetectInput struct{} +type ContainerDetectOutput struct { + Runtime container.ContainerRuntime `json:"runtime"` +} + +func (s *Subsystem) containerDetect(_ context.Context, _ *mcp.CallToolRequest, _ ContainerDetectInput) (*mcp.CallToolResult, ContainerDetectOutput, error) { + result := s.core.Action("container.runtime.detect").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, ContainerDetectOutput{}, err + } + return nil, ContainerDetectOutput{}, coreerr.E("mcp.containerDetect", "container.runtime.detect failed", nil) + } + runtime, ok := result.Value.(container.ContainerRuntime) + if !ok { + return nil, ContainerDetectOutput{}, coreerr.E("mcp.containerDetect", "unexpected result type", nil) + } + return nil, ContainerDetectOutput{Runtime: runtime}, nil +} + +type TIMStateInput struct{} +type TIMStateOutput struct { + State container.TIMState `json:"state"` +} + +func (s *Subsystem) timStatus(_ context.Context, _ *mcp.CallToolRequest, _ TIMStateInput) (*mcp.CallToolResult, TIMStateOutput, error) { + result := s.core.Action("tim.status").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, TIMStateOutput{}, err + } + return nil, TIMStateOutput{}, coreerr.E("mcp.timStatus", "tim.status failed", nil) + } + state, ok := result.Value.(container.TIMState) + if !ok { + return nil, TIMStateOutput{}, coreerr.E("mcp.timStatus", "unexpected result type", nil) + } + return nil, TIMStateOutput{State: state}, nil +} + +type TIMStartInput struct{} +type TIMStartOutput struct { + State container.TIMState `json:"state"` +} + +func (s *Subsystem) timStart(_ context.Context, _ *mcp.CallToolRequest, _ TIMStartInput) (*mcp.CallToolResult, TIMStartOutput, error) { + result := s.core.Action("tim.start").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, TIMStartOutput{}, err + } + return nil, TIMStartOutput{}, coreerr.E("mcp.timStart", "tim.start failed", nil) + } + state, ok := result.Value.(container.TIMState) + if !ok { + return nil, TIMStartOutput{}, coreerr.E("mcp.timStart", "unexpected result type", nil) + } + return nil, TIMStartOutput{State: state}, nil +} + +type TIMStopInput struct{} +type TIMStopOutput struct { + State container.TIMState `json:"state"` +} + +func (s *Subsystem) timStop(_ context.Context, _ *mcp.CallToolRequest, _ TIMStopInput) (*mcp.CallToolResult, TIMStopOutput, error) { + result := s.core.Action("tim.stop").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, TIMStopOutput{}, err + } + return nil, TIMStopOutput{}, coreerr.E("mcp.timStop", "tim.stop failed", nil) + } + state, ok := result.Value.(container.TIMState) + if !ok { + return nil, TIMStopOutput{}, coreerr.E("mcp.timStop", "unexpected result type", nil) + } + return nil, TIMStopOutput{State: state}, nil +} + +func (s *Subsystem) registerContainerTools(server *mcp.Server) { + addTool(s, server, &mcp.Tool{Name: "container_detect_runtime", Description: "Detect the preferred isolated workload runtime on this host"}, s.containerDetect) + addTool(s, server, &mcp.Tool{Name: "tim_status", Description: "Inspect the TIM container state"}, s.timStatus) + addTool(s, server, &mcp.Tool{Name: "tim_start", Description: "Start the TIM container"}, s.timStart) + addTool(s, server, &mcp.Tool{Name: "tim_stop", Description: "Stop the TIM container"}, s.timStop) +} diff --git a/pkg/mcp/tools_deno.go b/pkg/mcp/tools_deno.go new file mode 100644 index 00000000..10379ee7 --- /dev/null +++ b/pkg/mcp/tools_deno.go @@ -0,0 +1,103 @@ +package mcp + +import ( + "context" + + core "dappco.re/go/core" + coreerr "dappco.re/go/core/log" + "forge.lthn.ai/core/gui/pkg/deno" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +type DenoStatusInput struct{} + +type DenoStatusOutput struct { + Status deno.Status `json:"status"` +} + +func (s *Subsystem) denoStatus(_ context.Context, _ *mcp.CallToolRequest, _ DenoStatusInput) (*mcp.CallToolResult, DenoStatusOutput, error) { + result := s.core.Action("core.deno.sidecar.status").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, DenoStatusOutput{}, err + } + return nil, DenoStatusOutput{}, coreerr.E("mcp.denoStatus", "core.deno.sidecar.status failed", nil) + } + status, ok := result.Value.(deno.Status) + if !ok { + return nil, DenoStatusOutput{}, coreerr.E("mcp.denoStatus", "unexpected result type", nil) + } + return nil, DenoStatusOutput{Status: status}, nil +} + +type DenoStartInput struct{} +type DenoStartOutput struct { + Status deno.Status `json:"status"` +} + +func (s *Subsystem) denoStart(_ context.Context, _ *mcp.CallToolRequest, _ DenoStartInput) (*mcp.CallToolResult, DenoStartOutput, error) { + result := s.core.Action("core.deno.sidecar.start").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, DenoStartOutput{}, err + } + return nil, DenoStartOutput{}, coreerr.E("mcp.denoStart", "core.deno.sidecar.start failed", nil) + } + status, ok := result.Value.(deno.Status) + if !ok { + return nil, DenoStartOutput{}, coreerr.E("mcp.denoStart", "unexpected result type", nil) + } + return nil, DenoStartOutput{Status: status}, nil +} + +type DenoStopInput struct{} +type DenoStopOutput struct { + Status deno.Status `json:"status"` +} + +func (s *Subsystem) denoStop(_ context.Context, _ *mcp.CallToolRequest, _ DenoStopInput) (*mcp.CallToolResult, DenoStopOutput, error) { + result := s.core.Action("core.deno.sidecar.stop").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, DenoStopOutput{}, err + } + return nil, DenoStopOutput{}, coreerr.E("mcp.denoStop", "core.deno.sidecar.stop failed", nil) + } + status, ok := result.Value.(deno.Status) + if !ok { + return nil, DenoStopOutput{}, coreerr.E("mcp.denoStop", "unexpected result type", nil) + } + return nil, DenoStopOutput{Status: status}, nil +} + +type DenoEvalInput struct { + Code string `json:"code"` +} + +type DenoEvalOutput struct { + Result deno.EvalResult `json:"result"` +} + +func (s *Subsystem) denoEval(_ context.Context, _ *mcp.CallToolRequest, input DenoEvalInput) (*mcp.CallToolResult, DenoEvalOutput, error) { + result := s.core.Action("core.deno.sidecar.eval").Run(context.Background(), core.NewOptions( + core.Option{Key: "code", Value: input.Code}, + )) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, DenoEvalOutput{}, err + } + return nil, DenoEvalOutput{}, coreerr.E("mcp.denoEval", "core.deno.sidecar.eval failed", nil) + } + value, ok := result.Value.(deno.EvalResult) + if !ok { + return nil, DenoEvalOutput{}, coreerr.E("mcp.denoEval", "unexpected result type", nil) + } + return nil, DenoEvalOutput{Result: value}, nil +} + +func (s *Subsystem) registerDenoTools(server *mcp.Server) { + addTool(s, server, &mcp.Tool{Name: "deno_status", Description: "Inspect the CoreDeno sidecar process and IPC connection state"}, s.denoStatus) + addTool(s, server, &mcp.Tool{Name: "deno_start", Description: "Start the CoreDeno sidecar process"}, s.denoStart) + addTool(s, server, &mcp.Tool{Name: "deno_stop", Description: "Stop the CoreDeno sidecar process"}, s.denoStop) + addTool(s, server, &mcp.Tool{Name: "deno_eval", Description: `Evaluate JavaScript inside the CoreDeno sidecar. Example: {"code":"await core.action('display.models.state')"} `}, s.denoEval) +} diff --git a/pkg/mcp/tools_p2p.go b/pkg/mcp/tools_p2p.go new file mode 100644 index 00000000..cb0a2f90 --- /dev/null +++ b/pkg/mcp/tools_p2p.go @@ -0,0 +1,69 @@ +package mcp + +import ( + "context" + + core "dappco.re/go/core" + coreerr "dappco.re/go/core/log" + "forge.lthn.ai/core/gui/pkg/p2p" + "github.com/modelcontextprotocol/go-sdk/mcp" +) + +type P2PPublishInput struct { + Topic string `json:"topic"` + Route string `json:"route,omitempty"` + SenderID string `json:"sender_id,omitempty"` + Payload map[string]any `json:"payload,omitempty"` +} + +type P2PPublishOutput struct { + Success bool `json:"success"` +} + +func (s *Subsystem) p2pPublish(_ context.Context, _ *mcp.CallToolRequest, input P2PPublishInput) (*mcp.CallToolResult, P2PPublishOutput, error) { + result := s.core.Action("p2p.publish").Run(context.Background(), core.NewOptions( + core.Option{Key: "topic", Value: input.Topic}, + core.Option{Key: "route", Value: input.Route}, + core.Option{Key: "sender_id", Value: input.SenderID}, + core.Option{Key: "payload", Value: input.Payload}, + )) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, P2PPublishOutput{}, err + } + return nil, P2PPublishOutput{}, coreerr.E("mcp.p2pPublish", "p2p.publish failed", nil) + } + return nil, P2PPublishOutput{Success: true}, nil +} + +type P2PStateInput struct{} + +type P2PStateOutput struct { + State p2p.State `json:"state"` +} + +func (s *Subsystem) p2pState(_ context.Context, _ *mcp.CallToolRequest, _ P2PStateInput) (*mcp.CallToolResult, P2PStateOutput, error) { + result := s.core.Action("p2p.state").Run(context.Background(), core.Options{}) + if !result.OK { + if err, ok := result.Value.(error); ok { + return nil, P2PStateOutput{}, err + } + return nil, P2PStateOutput{}, coreerr.E("mcp.p2pState", "p2p.state failed", nil) + } + state, ok := result.Value.(p2p.State) + if !ok { + return nil, P2PStateOutput{}, coreerr.E("mcp.p2pState", "unexpected result type", nil) + } + return nil, P2PStateOutput{State: state}, nil +} + +func (s *Subsystem) registerP2PTools(server *mcp.Server) { + addTool(s, server, &mcp.Tool{ + Name: "p2p_publish", + Description: `Publish a P2P envelope over the configured transport. Example: {"topic":"display","route":"chat.sync","payload":{"message":"hello"}}`, + }, s.p2pPublish) + addTool(s, server, &mcp.Tool{ + Name: "p2p_state", + Description: "Inspect the configured P2P node state, listen address, and observed peers", + }, s.p2pState) +} diff --git a/pkg/p2p/service.go b/pkg/p2p/service.go new file mode 100644 index 00000000..ed402173 --- /dev/null +++ b/pkg/p2p/service.go @@ -0,0 +1,123 @@ +package p2p + +import ( + "context" + "strings" + + core "dappco.re/go/core" +) + +type Options struct { + ListenAddr string + PeerAddrs []string + NodeID string +} + +type Service struct { + *core.ServiceRuntime[Options] + router *Router + driver *TCPDriver +} + +type State struct { + NodeID string `json:"node_id"` + ListenAddr string `json:"listen_addr,omitempty"` + Peers []Peer `json:"peers,omitempty"` +} + +func NewService(c *core.Core, options Options) *Service { + driver := NewTCPDriver(TCPOptions{ + ListenAddr: options.ListenAddr, + PeerAddrs: options.PeerAddrs, + NodeID: options.NodeID, + }) + return &Service{ + ServiceRuntime: core.NewServiceRuntime(c, options), + router: New(driver), + driver: driver, + } +} + +func OptionsFromEnv() Options { + peers := strings.Split(strings.TrimSpace(core.Env("CORE_P2P_PEERS")), ",") + filtered := make([]string, 0, len(peers)) + for _, peer := range peers { + peer = strings.TrimSpace(peer) + if peer != "" { + filtered = append(filtered, peer) + } + } + return Options{ + ListenAddr: strings.TrimSpace(core.Env("CORE_P2P_ADDR")), + PeerAddrs: filtered, + NodeID: strings.TrimSpace(core.Env("CORE_P2P_NODE_ID")), + } +} + +func (s *Service) OnStartup(_ context.Context) core.Result { + s.Core().Action("p2p.publish", func(ctx context.Context, opts core.Options) core.Result { + payload := mapValue(opts, "payload") + envelope := Envelope{ + Topic: opts.String("topic"), + Route: opts.String("route"), + SenderID: coalesce(opts.String("sender_id"), s.Options().NodeID), + Payload: payload, + } + return core.Result{}.New(nil, s.Publish(ctx, envelope)) + }) + s.Core().Action("p2p.state", func(_ context.Context, _ core.Options) core.Result { + return core.Result{Value: s.State(), OK: true} + }) + return core.Result{OK: true} +} + +func (s *Service) OnShutdown(_ context.Context) core.Result { + return core.Result{}.New(nil, s.driver.Close()) +} + +func (s *Service) Publish(ctx context.Context, envelope Envelope) error { + return s.router.Publish(ctx, envelope) +} + +func (s *Service) Subscribe(ctx context.Context, topic string, handler func(Envelope)) error { + return s.router.Subscribe(ctx, topic, handler) +} + +func (s *Service) Peers() []Peer { + return s.router.Peers() +} + +func (s *Service) State() State { + return State{ + NodeID: s.Options().NodeID, + ListenAddr: s.driver.ListenAddr(), + Peers: s.Peers(), + } +} + +func mapValue(opts core.Options, key string) map[string]any { + result := opts.Get(key) + if !result.OK { + return nil + } + value := result.Value + switch typed := value.(type) { + case map[string]any: + return typed + default: + var normalized map[string]any + if result := core.JSONUnmarshalString(core.JSONMarshalString(typed), &normalized); result.OK { + return normalized + } + return map[string]any{"value": typed} + } +} + +func coalesce(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return value + } + } + return "" +} diff --git a/pkg/p2p/tcp.go b/pkg/p2p/tcp.go new file mode 100644 index 00000000..3fce4474 --- /dev/null +++ b/pkg/p2p/tcp.go @@ -0,0 +1,147 @@ +package p2p + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "net" + "strings" + "sync" +) + +type TCPOptions struct { + ListenAddr string + PeerAddrs []string + NodeID string +} + +type TCPDriver struct { + options TCPOptions + mu sync.RWMutex + listener net.Listener + subscriptions map[string][]func(Envelope) +} + +func NewTCPDriver(options TCPOptions) *TCPDriver { + return &TCPDriver{ + options: TCPOptions{ + ListenAddr: strings.TrimSpace(options.ListenAddr), + PeerAddrs: append([]string(nil), options.PeerAddrs...), + NodeID: strings.TrimSpace(options.NodeID), + }, + subscriptions: make(map[string][]func(Envelope)), + } +} + +func (d *TCPDriver) ListenAddr() string { + d.mu.RLock() + defer d.mu.RUnlock() + if d.listener != nil { + return d.listener.Addr().String() + } + return d.options.ListenAddr +} + +func (d *TCPDriver) Subscribe(_ context.Context, topic string, handler func(Envelope)) error { + topic = strings.TrimSpace(topic) + if topic == "" { + return errors.New("topic is required") + } + if handler == nil { + return errors.New("handler is required") + } + d.mu.Lock() + d.subscriptions[topic] = append(d.subscriptions[topic], handler) + d.mu.Unlock() + return d.ensureListener() +} + +func (d *TCPDriver) Publish(ctx context.Context, envelope Envelope) error { + if strings.TrimSpace(envelope.Topic) == "" { + return errors.New("topic is required") + } + if strings.TrimSpace(envelope.SenderID) == "" { + envelope.SenderID = d.options.NodeID + } + d.dispatch(envelope) + payload, err := json.Marshal(envelope) + if err != nil { + return err + } + for _, peer := range d.options.PeerAddrs { + peer = strings.TrimSpace(peer) + if peer == "" { + continue + } + conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", peer) + if err != nil { + return err + } + if _, err := conn.Write(append(payload, '\n')); err != nil { + _ = conn.Close() + return err + } + _ = conn.Close() + } + return nil +} + +func (d *TCPDriver) Close() error { + d.mu.Lock() + defer d.mu.Unlock() + if d.listener == nil { + return nil + } + err := d.listener.Close() + d.listener = nil + return err +} + +func (d *TCPDriver) ensureListener() error { + d.mu.Lock() + defer d.mu.Unlock() + if d.listener != nil || strings.TrimSpace(d.options.ListenAddr) == "" { + return nil + } + listener, err := net.Listen("tcp", d.options.ListenAddr) + if err != nil { + return err + } + d.listener = listener + go d.acceptLoop(listener) + return nil +} + +func (d *TCPDriver) acceptLoop(listener net.Listener) { + for { + conn, err := listener.Accept() + if err != nil { + return + } + go d.readConn(conn) + } +} + +func (d *TCPDriver) readConn(conn net.Conn) { + defer conn.Close() + scanner := bufio.NewScanner(conn) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + var envelope Envelope + if err := json.Unmarshal(scanner.Bytes(), &envelope); err != nil { + continue + } + d.dispatch(envelope) + } +} + +func (d *TCPDriver) dispatch(envelope Envelope) { + d.mu.RLock() + handlers := append([]func(Envelope){}, d.subscriptions[envelope.Topic]...) + handlers = append(handlers, d.subscriptions["*"]...) + d.mu.RUnlock() + for _, handler := range handlers { + handler(envelope) + } +} diff --git a/ui/src/chat/chat-panel.component.ts b/ui/src/chat/chat-panel.component.ts index e22b1848..71491218 100644 --- a/ui/src/chat/chat-panel.component.ts +++ b/ui/src/chat/chat-panel.component.ts @@ -35,10 +35,11 @@ import { ChatStateService } from './chat-state.service'; <main class="chat-shell"> <header class="chat-shell__header"> - <div> - <p class="eyebrow">CoreGUI Chat</p> - <h1>{{ state.activeConversation()?.title || 'Local chat' }}</h1> - </div> + <core-chat-hero + eyebrow="CoreGUI Chat" + [attr.title]="state.activeConversation()?.title || 'Local chat'" + subtitle="Shadow-DOM shell for the native Web Components migration." + /> <div class="chat-shell__controls"> <chat-model-selector [models]="state.models()" @@ -46,7 +47,12 @@ import { ChatStateService } from './chat-state.service'; [loading]="state.modelSwitching()" (valueChange)="state.changeModel($event)" /> - <wa-button type="button" class="settings" appearance="filled" (click)="state.settingsOpen.set(!state.settingsOpen())"> + <wa-button + type="button" + class="settings" + appearance="filled" + (click)="state.settingsOpen.set(!state.settingsOpen())" + > Settings </wa-button> </div> @@ -85,21 +91,64 @@ import { ChatStateService } from './chat-state.service'; `, styles: [ ` - :host { display: block; min-height: 100vh; color: #f8fafc; background: - radial-gradient(circle at top left, rgba(245, 158, 11, 0.18), transparent 30%), - radial-gradient(circle at right, rgba(14, 165, 233, 0.16), transparent 24%), - linear-gradient(160deg, #020617 0%, #081121 46%, #111827 100%); - font-family: 'Iowan Old Style', 'Palatino Linotype', 'Book Antiqua', serif; } - .workspace { min-height: 100vh; display: grid; grid-template-columns: 20rem 1fr; } - .chat-shell { min-height: 0; display: grid; grid-template-rows: auto auto minmax(0, 1fr) auto; gap: 1rem; padding: 1.5rem; } - .chat-shell__header { display: flex; justify-content: space-between; gap: 1rem; align-items: end; } - .chat-shell__controls { display: flex; flex-wrap: wrap; gap: 0.75rem; align-items: center; } - .chat-shell__thread { min-height: 0; overflow: hidden; padding: 1rem 0.2rem 1rem 0; } - .eyebrow { margin: 0; color: #f59e0b; text-transform: uppercase; letter-spacing: 0.18em; font-size: 0.72rem; } - h1 { margin: 0.2rem 0 0; font-size: clamp(2rem, 3vw, 3rem); line-height: 1; } - .settings { border: 1px solid rgba(251, 191, 36, 0.22); border-radius: 999px; background: rgba(124, 45, 18, 0.25); color: #fde68a; padding: 0.85rem 1.2rem; cursor: pointer; } + :host { + display: block; + min-height: 100vh; + color: #f8fafc; + background: + radial-gradient(circle at top left, rgba(245, 158, 11, 0.18), transparent 30%), + radial-gradient(circle at right, rgba(14, 165, 233, 0.16), transparent 24%), + linear-gradient(160deg, #020617 0%, #081121 46%, #111827 100%); + font-family: 'Iowan Old Style', 'Palatino Linotype', 'Book Antiqua', serif; + } + .workspace { + min-height: 100vh; + display: grid; + grid-template-columns: 20rem 1fr; + } + .chat-shell { + min-height: 0; + display: grid; + grid-template-rows: auto auto minmax(0, 1fr) auto; + gap: 1rem; + padding: 1.5rem; + } + .chat-shell__header { + display: flex; + justify-content: space-between; + gap: 1rem; + align-items: start; + } + .chat-shell__controls { + display: flex; + flex-wrap: wrap; + gap: 0.75rem; + align-items: center; + } + .chat-shell__thread { + min-height: 0; + overflow: hidden; + padding: 1rem 0.2rem 1rem 0; + } + core-chat-hero { + flex: 1 1 auto; + min-width: 18rem; + } + .settings { + border: 1px solid rgba(251, 191, 36, 0.22); + border-radius: 999px; + background: rgba(124, 45, 18, 0.25); + color: #fde68a; + padding: 0.85rem 1.2rem; + cursor: pointer; + } @media (max-width: 960px) { - .workspace { grid-template-columns: 1fr; } + .workspace { + grid-template-columns: 1fr; + } + .chat-shell__header { + flex-direction: column; + } } `, ], diff --git a/ui/src/main.ts b/ui/src/main.ts index 3d7b3d32..39c553f6 100644 --- a/ui/src/main.ts +++ b/ui/src/main.ts @@ -3,6 +3,9 @@ import '@awesome.me/webawesome'; import { platformBrowser } from '@angular/platform-browser'; import { AppModule } from './app/app-module'; +import { registerChatHeroElement } from './web-components/chat-hero.element'; + +registerChatHeroElement(); platformBrowser() .bootstrapModule(AppModule, { diff --git a/ui/src/web-components/chat-hero.element.ts b/ui/src/web-components/chat-hero.element.ts new file mode 100644 index 00000000..0642596d --- /dev/null +++ b/ui/src/web-components/chat-hero.element.ts @@ -0,0 +1,76 @@ +const template = document.createElement('template'); +template.innerHTML = ` + <style> + :host { + display: block; + border: 1px solid rgba(251, 191, 36, 0.16); + border-radius: 24px; + padding: 1.2rem 1.35rem; + background: + linear-gradient(135deg, rgba(120, 53, 15, 0.38), rgba(15, 23, 42, 0.88)), + radial-gradient(circle at top right, rgba(125, 211, 252, 0.18), transparent 36%); + box-shadow: 0 18px 50px rgba(2, 6, 23, 0.32); + } + .eyebrow { + margin: 0; + color: #fbbf24; + text-transform: uppercase; + letter-spacing: 0.18em; + font: 600 0.72rem/1.2 "Avenir Next Condensed", "Gill Sans", sans-serif; + } + h1 { + margin: 0.35rem 0 0; + color: #f8fafc; + font: 700 clamp(1.9rem, 3vw, 3rem)/1 "Iowan Old Style", "Palatino Linotype", serif; + } + .subtitle { + margin: 0.55rem 0 0; + color: #cbd5e1; + font: 500 0.95rem/1.5 "Avenir Next", "Segoe UI", sans-serif; + } + </style> + <p class="eyebrow"></p> + <h1></h1> + <p class="subtitle"></p> +`; + +class CoreChatHeroElement extends HTMLElement { + static get observedAttributes(): string[] { + return ['eyebrow', 'title', 'subtitle']; + } + + constructor() { + super(); + const shadowRoot = this.attachShadow({ mode: 'open' }); + shadowRoot.appendChild(template.content.cloneNode(true)); + } + + connectedCallback(): void { + this.render(); + } + + attributeChangedCallback(): void { + this.render(); + } + + private render(): void { + const root = this.shadowRoot; + if (!root) { + return; + } + const eyebrow = root.querySelector('.eyebrow'); + const title = root.querySelector('h1'); + const subtitle = root.querySelector('.subtitle'); + if (eyebrow) eyebrow.textContent = this.getAttribute('eyebrow') || 'CoreGUI Chat'; + if (title) title.textContent = this.getAttribute('title') || 'Local chat'; + if (subtitle) + subtitle.textContent = + this.getAttribute('subtitle') || 'Local-first chat, UI events, and sidecar tooling'; + } +} + +export function registerChatHeroElement(): void { + if (!customElements.get('core-chat-hero')) { + customElements.define('core-chat-hero', CoreChatHeroElement); + } +}