diff --git a/pkg/mcp/register.go b/pkg/mcp/register.go index 1c8163e..5bd4b95 100644 --- a/pkg/mcp/register.go +++ b/pkg/mcp/register.go @@ -107,6 +107,29 @@ func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { switch ev := msg.(type) { case ChannelPush: s.ChannelSend(ctx, ev.Channel, ev.Data) + case process.ActionProcessStarted: + s.ChannelSend(ctx, ChannelProcessStart, map[string]any{ + "id": ev.ID, + "command": ev.Command, + "args": ev.Args, + "dir": ev.Dir, + "pid": ev.PID, + }) + case process.ActionProcessExited: + payload := map[string]any{ + "id": ev.ID, + "exitCode": ev.ExitCode, + "duration": ev.Duration, + } + if ev.Error != nil { + payload["error"] = ev.Error.Error() + } + s.ChannelSend(ctx, ChannelProcessExit, payload) + case process.ActionProcessKilled: + s.ChannelSend(ctx, ChannelProcessExit, map[string]any{ + "id": ev.ID, + "signal": ev.Signal, + }) } return core.Result{OK: true} } diff --git a/pkg/mcp/register_test.go b/pkg/mcp/register_test.go index ebc4772..d4e4b8a 100644 --- a/pkg/mcp/register_test.go +++ b/pkg/mcp/register_test.go @@ -1,7 +1,12 @@ package mcp import ( + "bufio" + "context" + "encoding/json" + "net" "testing" + "time" "dappco.re/go/core" "forge.lthn.ai/core/go-process" @@ -49,3 +54,90 @@ func TestRegister_Good_WiresOptionalServices(t *testing.T) { t.Fatal("expected ws tools to be registered when ws hub is available") } } + +func TestHandleIPCEvents_Good_ForwardsProcessActions(t *testing.T) { + svc, err := New(Options{}) + if err != nil { + t.Fatalf("New() failed: %v", err) + } + + serverConn, clientConn := net.Pipe() + defer clientConn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + session, err := svc.server.Connect(ctx, &connTransport{conn: serverConn}, nil) + if err != nil { + t.Fatalf("Connect() failed: %v", err) + } + defer session.Close() + + clientConn.SetDeadline(time.Now().Add(5 * time.Second)) + scanner := bufio.NewScanner(clientConn) + scanner.Buffer(make([]byte, 64*1024), 10*1024*1024) + received := make(chan map[string]any, 8) + errCh := make(chan error, 1) + go func() { + for scanner.Scan() { + var msg map[string]any + if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil { + errCh <- err + return + } + received <- msg + } + if err := scanner.Err(); err != nil { + errCh <- err + return + } + close(received) + }() + + result := svc.HandleIPCEvents(nil, process.ActionProcessStarted{ + ID: "proc-1", + Command: "go", + Args: []string{"test", "./..."}, + Dir: "/workspace", + PID: 1234, + }) + if !result.OK { + t.Fatalf("HandleIPCEvents() returned non-OK result: %#v", result.Value) + } + + deadline := time.NewTimer(5 * time.Second) + defer deadline.Stop() + + for { + select { + case err := <-errCh: + t.Fatalf("failed to read notification: %v", err) + case msg, ok := <-received: + if !ok { + t.Fatal("notification stream closed before expected message arrived") + } + if msg["method"] != channelNotificationMethod { + continue + } + + params, ok := msg["params"].(map[string]any) + if !ok { + t.Fatalf("expected params object, got %T", msg["params"]) + } + if params["channel"] != ChannelProcessStart { + continue + } + + payload, ok := params["data"].(map[string]any) + if !ok { + t.Fatalf("expected data object, got %T", params["data"]) + } + if payload["id"] != "proc-1" || payload["command"] != "go" { + t.Fatalf("unexpected payload: %#v", payload) + } + return + case <-deadline.C: + t.Fatal("timed out waiting for process start notification") + } + } +}