feat(mcp): forward process lifecycle actions to channel notifications

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-04-02 11:53:44 +00:00
parent d57f9d4039
commit 2df8866404
2 changed files with 115 additions and 0 deletions

View file

@ -107,6 +107,29 @@ func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case ChannelPush:
s.ChannelSend(ctx, ev.Channel, ev.Data)
case process.ActionProcessStarted:
s.ChannelSend(ctx, ChannelProcessStart, map[string]any{
"id": ev.ID,
"command": ev.Command,
"args": ev.Args,
"dir": ev.Dir,
"pid": ev.PID,
})
case process.ActionProcessExited:
payload := map[string]any{
"id": ev.ID,
"exitCode": ev.ExitCode,
"duration": ev.Duration,
}
if ev.Error != nil {
payload["error"] = ev.Error.Error()
}
s.ChannelSend(ctx, ChannelProcessExit, payload)
case process.ActionProcessKilled:
s.ChannelSend(ctx, ChannelProcessExit, map[string]any{
"id": ev.ID,
"signal": ev.Signal,
})
}
return core.Result{OK: true}
}

View file

@ -1,7 +1,12 @@
package mcp
import (
"bufio"
"context"
"encoding/json"
"net"
"testing"
"time"
"dappco.re/go/core"
"forge.lthn.ai/core/go-process"
@ -49,3 +54,90 @@ func TestRegister_Good_WiresOptionalServices(t *testing.T) {
t.Fatal("expected ws tools to be registered when ws hub is available")
}
}
func TestHandleIPCEvents_Good_ForwardsProcessActions(t *testing.T) {
svc, err := New(Options{})
if err != nil {
t.Fatalf("New() failed: %v", err)
}
serverConn, clientConn := net.Pipe()
defer clientConn.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
session, err := svc.server.Connect(ctx, &connTransport{conn: serverConn}, nil)
if err != nil {
t.Fatalf("Connect() failed: %v", err)
}
defer session.Close()
clientConn.SetDeadline(time.Now().Add(5 * time.Second))
scanner := bufio.NewScanner(clientConn)
scanner.Buffer(make([]byte, 64*1024), 10*1024*1024)
received := make(chan map[string]any, 8)
errCh := make(chan error, 1)
go func() {
for scanner.Scan() {
var msg map[string]any
if err := json.Unmarshal(scanner.Bytes(), &msg); err != nil {
errCh <- err
return
}
received <- msg
}
if err := scanner.Err(); err != nil {
errCh <- err
return
}
close(received)
}()
result := svc.HandleIPCEvents(nil, process.ActionProcessStarted{
ID: "proc-1",
Command: "go",
Args: []string{"test", "./..."},
Dir: "/workspace",
PID: 1234,
})
if !result.OK {
t.Fatalf("HandleIPCEvents() returned non-OK result: %#v", result.Value)
}
deadline := time.NewTimer(5 * time.Second)
defer deadline.Stop()
for {
select {
case err := <-errCh:
t.Fatalf("failed to read notification: %v", err)
case msg, ok := <-received:
if !ok {
t.Fatal("notification stream closed before expected message arrived")
}
if msg["method"] != channelNotificationMethod {
continue
}
params, ok := msg["params"].(map[string]any)
if !ok {
t.Fatalf("expected params object, got %T", msg["params"])
}
if params["channel"] != ChannelProcessStart {
continue
}
payload, ok := params["data"].(map[string]any)
if !ok {
t.Fatalf("expected data object, got %T", params["data"])
}
if payload["id"] != "proc-1" || payload["command"] != "go" {
t.Fatalf("unexpected payload: %#v", payload)
}
return
case <-deadline.C:
t.Fatal("timed out waiting for process start notification")
}
}
}