feat(coredeno): Tier 2 bidirectional bridge — Go↔Deno module lifecycle
Wire the CoreDeno sidecar into a fully bidirectional bridge: - Deno→Go (gRPC): Deno connects as CoreService client via polyfilled @grpc/grpc-js over Unix socket. Polyfill patches Deno 2.x http2 gaps (getDefaultSettings, pre-connected socket handling, remoteSettings). - Go→Deno (JSON-RPC): Go connects to Deno's newline-delimited JSON-RPC server for module lifecycle (LoadModule, UnloadModule, ModuleStatus). gRPC server direction avoided due to Deno http2.createServer limitations. - ProcessStart/ProcessStop: gRPC handlers delegate to process.Service with manifest permission gating (run permissions). - Deno runtime: main.ts boots DenoService server, connects CoreService client with retry + health-check round-trip, handles SIGTERM shutdown. 40 unit tests + 2 integration tests (Tier 1 boot + Tier 2 bidirectional). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
2f246ad053
commit
af98accc03
16 changed files with 1177 additions and 30 deletions
|
|
@ -13,12 +13,13 @@ import (
|
|||
|
||||
// Options configures the CoreDeno sidecar.
|
||||
type Options struct {
|
||||
DenoPath string // path to deno binary (default: "deno")
|
||||
SocketPath string // Unix socket path for gRPC
|
||||
AppRoot string // app root directory (sandboxed I/O)
|
||||
StoreDBPath string // SQLite DB path (default: AppRoot/.core/store.db)
|
||||
PublicKey ed25519.PublicKey // ed25519 public key for manifest verification (optional)
|
||||
SidecarArgs []string // args passed to the sidecar process
|
||||
DenoPath string // path to deno binary (default: "deno")
|
||||
SocketPath string // Unix socket path for Go's gRPC server (CoreService)
|
||||
DenoSocketPath string // Unix socket path for Deno's gRPC server (DenoService)
|
||||
AppRoot string // app root directory (sandboxed I/O)
|
||||
StoreDBPath string // SQLite DB path (default: AppRoot/.core/store.db)
|
||||
PublicKey ed25519.PublicKey // ed25519 public key for manifest verification (optional)
|
||||
SidecarArgs []string // args passed to the sidecar process
|
||||
}
|
||||
|
||||
// Permissions declares per-module Deno permission flags.
|
||||
|
|
@ -74,6 +75,9 @@ func NewSidecar(opts Options) *Sidecar {
|
|||
if opts.SocketPath == "" {
|
||||
opts.SocketPath = DefaultSocketPath()
|
||||
}
|
||||
if opts.DenoSocketPath == "" && opts.SocketPath != "" {
|
||||
opts.DenoSocketPath = filepath.Join(filepath.Dir(opts.SocketPath), "deno.sock")
|
||||
}
|
||||
if opts.StoreDBPath == "" && opts.AppRoot != "" {
|
||||
opts.StoreDBPath = filepath.Join(opts.AppRoot, ".core", "store.db")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -80,3 +80,20 @@ func TestDefaultSocketPath_XDG(t *testing.T) {
|
|||
path := DefaultSocketPath()
|
||||
assert.Equal(t, "/run/user/1000/core/deno.sock", path)
|
||||
}
|
||||
|
||||
func TestOptions_DenoSocketPath_Default_Good(t *testing.T) {
|
||||
opts := Options{SocketPath: "/tmp/core/core.sock"}
|
||||
sc := NewSidecar(opts)
|
||||
assert.Equal(t, "/tmp/core/deno.sock", sc.opts.DenoSocketPath,
|
||||
"DenoSocketPath should default to same dir as SocketPath with deno.sock")
|
||||
}
|
||||
|
||||
func TestOptions_DenoSocketPath_Explicit_Good(t *testing.T) {
|
||||
opts := Options{
|
||||
SocketPath: "/tmp/core/core.sock",
|
||||
DenoSocketPath: "/tmp/custom/deno.sock",
|
||||
}
|
||||
sc := NewSidecar(opts)
|
||||
assert.Equal(t, "/tmp/custom/deno.sock", sc.opts.DenoSocketPath,
|
||||
"Explicit DenoSocketPath should not be overridden")
|
||||
}
|
||||
|
|
|
|||
127
pkg/coredeno/denoclient.go
Normal file
127
pkg/coredeno/denoclient.go
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
package coredeno
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// DenoClient communicates with the Deno sidecar's JSON-RPC server over a Unix socket.
|
||||
// Thread-safe: uses a mutex to serialize requests (one connection, request/response protocol).
|
||||
type DenoClient struct {
|
||||
mu sync.Mutex
|
||||
conn net.Conn
|
||||
reader *bufio.Reader
|
||||
}
|
||||
|
||||
// DialDeno connects to the Deno JSON-RPC server on the given Unix socket path.
|
||||
func DialDeno(socketPath string) (*DenoClient, error) {
|
||||
conn, err := net.Dial("unix", socketPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("deno dial: %w", err)
|
||||
}
|
||||
return &DenoClient{
|
||||
conn: conn,
|
||||
reader: bufio.NewReader(conn),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying connection.
|
||||
func (c *DenoClient) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *DenoClient) call(req map[string]any) (map[string]any, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal: %w", err)
|
||||
}
|
||||
data = append(data, '\n')
|
||||
|
||||
if _, err := c.conn.Write(data); err != nil {
|
||||
return nil, fmt.Errorf("write: %w", err)
|
||||
}
|
||||
|
||||
line, err := c.reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read: %w", err)
|
||||
}
|
||||
|
||||
var resp map[string]any
|
||||
if err := json.Unmarshal(line, &resp); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal: %w", err)
|
||||
}
|
||||
|
||||
if errMsg, ok := resp["error"].(string); ok && errMsg != "" {
|
||||
return nil, fmt.Errorf("deno: %s", errMsg)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// LoadModuleResponse holds the result of a LoadModule call.
|
||||
type LoadModuleResponse struct {
|
||||
Ok bool
|
||||
Error string
|
||||
}
|
||||
|
||||
// LoadModule tells Deno to load a module.
|
||||
func (c *DenoClient) LoadModule(code, entryPoint string, permissions []string) (*LoadModuleResponse, error) {
|
||||
resp, err := c.call(map[string]any{
|
||||
"method": "LoadModule",
|
||||
"code": code,
|
||||
"entry_point": entryPoint,
|
||||
"permissions": permissions,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &LoadModuleResponse{
|
||||
Ok: resp["ok"] == true,
|
||||
Error: fmt.Sprint(resp["error"]),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UnloadModuleResponse holds the result of an UnloadModule call.
|
||||
type UnloadModuleResponse struct {
|
||||
Ok bool
|
||||
}
|
||||
|
||||
// UnloadModule tells Deno to unload a module.
|
||||
func (c *DenoClient) UnloadModule(code string) (*UnloadModuleResponse, error) {
|
||||
resp, err := c.call(map[string]any{
|
||||
"method": "UnloadModule",
|
||||
"code": code,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &UnloadModuleResponse{
|
||||
Ok: resp["ok"] == true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ModuleStatusResponse holds the result of a ModuleStatus call.
|
||||
type ModuleStatusResponse struct {
|
||||
Code string
|
||||
Status string
|
||||
}
|
||||
|
||||
// ModuleStatus queries the status of a module in the Deno runtime.
|
||||
func (c *DenoClient) ModuleStatus(code string) (*ModuleStatusResponse, error) {
|
||||
resp, err := c.call(map[string]any{
|
||||
"method": "ModuleStatus",
|
||||
"code": code,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ModuleStatusResponse{
|
||||
Code: fmt.Sprint(resp["code"]),
|
||||
Status: fmt.Sprint(resp["status"]),
|
||||
}, nil
|
||||
}
|
||||
|
|
@ -18,16 +18,34 @@ import (
|
|||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
func TestIntegration_FullBoot_Good(t *testing.T) {
|
||||
// unused import guard
|
||||
var _ = pb.NewCoreServiceClient
|
||||
|
||||
func findDeno(t *testing.T) string {
|
||||
t.Helper()
|
||||
denoPath, err := exec.LookPath("deno")
|
||||
if err != nil {
|
||||
// Check ~/.deno/bin/deno
|
||||
home, _ := os.UserHomeDir()
|
||||
denoPath = filepath.Join(home, ".deno", "bin", "deno")
|
||||
if _, err := os.Stat(denoPath); err != nil {
|
||||
t.Skip("deno not installed")
|
||||
}
|
||||
}
|
||||
return denoPath
|
||||
}
|
||||
|
||||
// runtimeEntryPoint returns the absolute path to runtime/main.ts.
|
||||
func runtimeEntryPoint(t *testing.T) string {
|
||||
t.Helper()
|
||||
// We're in pkg/coredeno/ during test, runtime is a subdir
|
||||
abs, err := filepath.Abs("runtime/main.ts")
|
||||
require.NoError(t, err)
|
||||
require.FileExists(t, abs)
|
||||
return abs
|
||||
}
|
||||
|
||||
func TestIntegration_FullBoot_Good(t *testing.T) {
|
||||
denoPath := findDeno(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
|
|
@ -43,21 +61,14 @@ permissions:
|
|||
read: ["./data/"]
|
||||
`), 0644))
|
||||
|
||||
// Copy the runtime entry point
|
||||
runtimeDir := filepath.Join(coreDir, "runtime")
|
||||
require.NoError(t, os.MkdirAll(runtimeDir, 0755))
|
||||
src, err := os.ReadFile("runtime/main.ts")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, os.WriteFile(filepath.Join(runtimeDir, "main.ts"), src, 0644))
|
||||
|
||||
entryPoint := filepath.Join(runtimeDir, "main.ts")
|
||||
entryPoint := runtimeEntryPoint(t)
|
||||
|
||||
opts := Options{
|
||||
DenoPath: denoPath,
|
||||
SocketPath: sockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
SidecarArgs: []string{"run", "--allow-env", entryPoint},
|
||||
SidecarArgs: []string{"run", "-A", entryPoint},
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
|
|
@ -68,7 +79,7 @@ permissions:
|
|||
require.NoError(t, err)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = svc.OnStartup(ctx)
|
||||
|
|
@ -108,3 +119,107 @@ permissions:
|
|||
assert.NoError(t, err)
|
||||
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
|
||||
}
|
||||
|
||||
func TestIntegration_Tier2_Bidirectional_Good(t *testing.T) {
|
||||
denoPath := findDeno(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
denoSockPath := filepath.Join(tmpDir, "deno.sock")
|
||||
|
||||
// Write a manifest
|
||||
coreDir := filepath.Join(tmpDir, ".core")
|
||||
require.NoError(t, os.MkdirAll(coreDir, 0755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(`
|
||||
code: tier2-test
|
||||
name: Tier 2 Test
|
||||
version: "1.0"
|
||||
permissions:
|
||||
read: ["./data/"]
|
||||
run: ["echo"]
|
||||
`), 0644))
|
||||
|
||||
entryPoint := runtimeEntryPoint(t)
|
||||
|
||||
opts := Options{
|
||||
DenoPath: denoPath,
|
||||
SocketPath: sockPath,
|
||||
DenoSocketPath: denoSockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
SidecarArgs: []string{"run", "-A", entryPoint},
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(opts)
|
||||
result, err := factory(c)
|
||||
require.NoError(t, err)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = svc.OnStartup(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify both sockets appeared
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(sockPath)
|
||||
return err == nil
|
||||
}, 10*time.Second, 50*time.Millisecond, "core socket should appear")
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(denoSockPath)
|
||||
return err == nil
|
||||
}, 10*time.Second, 50*time.Millisecond, "deno socket should appear")
|
||||
|
||||
// Verify sidecar is running
|
||||
assert.True(t, svc.sidecar.IsRunning(), "Deno sidecar should be running")
|
||||
|
||||
// Verify DenoClient is connected
|
||||
require.NotNil(t, svc.DenoClient(), "DenoClient should be connected")
|
||||
|
||||
// Test Go → Deno: LoadModule
|
||||
loadResp, err := svc.DenoClient().LoadModule("test-module", "/modules/test/main.ts", []string{"read", "net"})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, loadResp.Ok)
|
||||
|
||||
// Test Go → Deno: ModuleStatus
|
||||
statusResp, err := svc.DenoClient().ModuleStatus("test-module")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test-module", statusResp.Code)
|
||||
assert.Equal(t, "RUNNING", statusResp.Status)
|
||||
|
||||
// Test Go → Deno: UnloadModule
|
||||
unloadResp, err := svc.DenoClient().UnloadModule("test-module")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, unloadResp.Ok)
|
||||
|
||||
// Verify module is now STOPPED
|
||||
statusResp2, err := svc.DenoClient().ModuleStatus("test-module")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "STOPPED", statusResp2.Status)
|
||||
|
||||
// Verify CoreService gRPC still works (Deno wrote health check data)
|
||||
conn, err := grpc.NewClient(
|
||||
"unix://"+sockPath,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
coreClient := pb.NewCoreServiceClient(conn)
|
||||
getResp, err := coreClient.StoreGet(ctx, &pb.StoreGetRequest{
|
||||
Group: "_coredeno", Key: "status",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, getResp.Found)
|
||||
assert.Equal(t, "connected", getResp.Value, "Deno should have written health check")
|
||||
|
||||
// Clean shutdown
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,12 +23,17 @@ func (s *Sidecar) Start(ctx context.Context, args ...string) error {
|
|||
return fmt.Errorf("coredeno: mkdir %s: %w", sockDir, err)
|
||||
}
|
||||
|
||||
// Remove stale socket
|
||||
os.Remove(s.opts.SocketPath)
|
||||
// Remove stale Deno socket (the Core socket is managed by ListenGRPC)
|
||||
if s.opts.DenoSocketPath != "" {
|
||||
os.Remove(s.opts.DenoSocketPath)
|
||||
}
|
||||
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
s.cmd = exec.CommandContext(s.ctx, s.opts.DenoPath, args...)
|
||||
s.cmd.Env = append(os.Environ(), "CORE_SOCKET="+s.opts.SocketPath)
|
||||
s.cmd.Env = append(os.Environ(),
|
||||
"CORE_SOCKET="+s.opts.SocketPath,
|
||||
"DENO_SOCKET="+s.opts.DenoSocketPath,
|
||||
)
|
||||
s.done = make(chan struct{})
|
||||
if err := s.cmd.Start(); err != nil {
|
||||
s.cmd = nil
|
||||
|
|
|
|||
|
|
@ -68,6 +68,42 @@ func TestStart_Good_EnvPassedToChild(t *testing.T) {
|
|||
assert.True(t, found, "child process should receive CORE_SOCKET=%s", sockPath)
|
||||
}
|
||||
|
||||
func TestStart_Good_DenoSocketEnv(t *testing.T) {
|
||||
sockDir := t.TempDir()
|
||||
sockPath := filepath.Join(sockDir, "core.sock")
|
||||
denoSockPath := filepath.Join(sockDir, "deno.sock")
|
||||
|
||||
sc := NewSidecar(Options{
|
||||
DenoPath: "sleep",
|
||||
SocketPath: sockPath,
|
||||
DenoSocketPath: denoSockPath,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := sc.Start(ctx, "10")
|
||||
require.NoError(t, err)
|
||||
defer sc.Stop()
|
||||
|
||||
sc.mu.RLock()
|
||||
env := sc.cmd.Env
|
||||
sc.mu.RUnlock()
|
||||
|
||||
foundCore := false
|
||||
foundDeno := false
|
||||
for _, e := range env {
|
||||
if e == "CORE_SOCKET="+sockPath {
|
||||
foundCore = true
|
||||
}
|
||||
if e == "DENO_SOCKET="+denoSockPath {
|
||||
foundDeno = true
|
||||
}
|
||||
}
|
||||
assert.True(t, foundCore, "child should receive CORE_SOCKET")
|
||||
assert.True(t, foundDeno, "child should receive DENO_SOCKET")
|
||||
}
|
||||
|
||||
func TestSocketDirCreated_Good(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
sockPath := filepath.Join(dir, "sub", "deno.sock")
|
||||
|
|
|
|||
95
pkg/coredeno/runtime/client.ts
Normal file
95
pkg/coredeno/runtime/client.ts
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
// CoreService gRPC client — Deno calls Go for I/O operations.
|
||||
// All filesystem, store, and process operations route through this client.
|
||||
|
||||
import * as grpc from "@grpc/grpc-js";
|
||||
import * as protoLoader from "@grpc/proto-loader";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
const PROTO_PATH = join(__dirname, "..", "proto", "coredeno.proto");
|
||||
|
||||
let packageDef: protoLoader.PackageDefinition | null = null;
|
||||
|
||||
function getProto(): any {
|
||||
if (!packageDef) {
|
||||
packageDef = protoLoader.loadSync(PROTO_PATH, {
|
||||
keepCase: true,
|
||||
longs: String,
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true,
|
||||
});
|
||||
}
|
||||
return grpc.loadPackageDefinition(packageDef).coredeno as any;
|
||||
}
|
||||
|
||||
export interface CoreClient {
|
||||
raw: any;
|
||||
storeGet(group: string, key: string): Promise<{ value: string; found: boolean }>;
|
||||
storeSet(group: string, key: string, value: string): Promise<{ ok: boolean }>;
|
||||
fileRead(path: string, moduleCode: string): Promise<{ content: string }>;
|
||||
fileWrite(path: string, content: string, moduleCode: string): Promise<{ ok: boolean }>;
|
||||
fileList(path: string, moduleCode: string): Promise<{ entries: Array<{ name: string; is_dir: boolean; size: number }> }>;
|
||||
fileDelete(path: string, moduleCode: string): Promise<{ ok: boolean }>;
|
||||
processStart(command: string, args: string[], moduleCode: string): Promise<{ process_id: string }>;
|
||||
processStop(processId: string): Promise<{ ok: boolean }>;
|
||||
close(): void;
|
||||
}
|
||||
|
||||
function promisify<T>(client: any, method: string, request: any): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
client[method](request, (err: Error | null, response: T) => {
|
||||
if (err) reject(err);
|
||||
else resolve(response);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function createCoreClient(socketPath: string): CoreClient {
|
||||
const proto = getProto();
|
||||
const client = new proto.CoreService(
|
||||
`unix:${socketPath}`,
|
||||
grpc.credentials.createInsecure(),
|
||||
);
|
||||
|
||||
return {
|
||||
raw: client,
|
||||
|
||||
storeGet(group: string, key: string) {
|
||||
return promisify(client, "StoreGet", { group, key });
|
||||
},
|
||||
|
||||
storeSet(group: string, key: string, value: string) {
|
||||
return promisify(client, "StoreSet", { group, key, value });
|
||||
},
|
||||
|
||||
fileRead(path: string, moduleCode: string) {
|
||||
return promisify(client, "FileRead", { path, module_code: moduleCode });
|
||||
},
|
||||
|
||||
fileWrite(path: string, content: string, moduleCode: string) {
|
||||
return promisify(client, "FileWrite", { path, content, module_code: moduleCode });
|
||||
},
|
||||
|
||||
fileList(path: string, moduleCode: string) {
|
||||
return promisify(client, "FileList", { path, module_code: moduleCode });
|
||||
},
|
||||
|
||||
fileDelete(path: string, moduleCode: string) {
|
||||
return promisify(client, "FileDelete", { path, module_code: moduleCode });
|
||||
},
|
||||
|
||||
processStart(command: string, args: string[], moduleCode: string) {
|
||||
return promisify(client, "ProcessStart", { command, args, module_code: moduleCode });
|
||||
},
|
||||
|
||||
processStop(processId: string) {
|
||||
return promisify(client, "ProcessStop", { process_id: processId });
|
||||
},
|
||||
|
||||
close() {
|
||||
client.close();
|
||||
},
|
||||
};
|
||||
}
|
||||
7
pkg/coredeno/runtime/deno.json
Normal file
7
pkg/coredeno/runtime/deno.json
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
{
|
||||
"imports": {
|
||||
"@grpc/grpc-js": "npm:@grpc/grpc-js@^1.12",
|
||||
"@grpc/proto-loader": "npm:@grpc/proto-loader@^0.7"
|
||||
},
|
||||
"nodeModulesDir": "none"
|
||||
}
|
||||
193
pkg/coredeno/runtime/deno.lock
generated
Normal file
193
pkg/coredeno/runtime/deno.lock
generated
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
{
|
||||
"version": "5",
|
||||
"specifiers": {
|
||||
"npm:@grpc/grpc-js@^1.12.0": "1.14.3",
|
||||
"npm:@grpc/proto-loader@0.7": "0.7.15"
|
||||
},
|
||||
"npm": {
|
||||
"@grpc/grpc-js@1.14.3": {
|
||||
"integrity": "sha512-Iq8QQQ/7X3Sac15oB6p0FmUg/klxQvXLeileoqrTRGJYLV+/9tubbr9ipz0GKHjmXVsgFPo/+W+2cA8eNcR+XA==",
|
||||
"dependencies": [
|
||||
"@grpc/proto-loader@0.8.0",
|
||||
"@js-sdsl/ordered-map"
|
||||
]
|
||||
},
|
||||
"@grpc/proto-loader@0.7.15": {
|
||||
"integrity": "sha512-tMXdRCfYVixjuFK+Hk0Q1s38gV9zDiDJfWL3h1rv4Qc39oILCu1TRTDt7+fGUI8K4G1Fj125Hx/ru3azECWTyQ==",
|
||||
"dependencies": [
|
||||
"lodash.camelcase",
|
||||
"long",
|
||||
"protobufjs",
|
||||
"yargs"
|
||||
],
|
||||
"bin": true
|
||||
},
|
||||
"@grpc/proto-loader@0.8.0": {
|
||||
"integrity": "sha512-rc1hOQtjIWGxcxpb9aHAfLpIctjEnsDehj0DAiVfBlmT84uvR0uUtN2hEi/ecvWVjXUGf5qPF4qEgiLOx1YIMQ==",
|
||||
"dependencies": [
|
||||
"lodash.camelcase",
|
||||
"long",
|
||||
"protobufjs",
|
||||
"yargs"
|
||||
],
|
||||
"bin": true
|
||||
},
|
||||
"@js-sdsl/ordered-map@4.4.2": {
|
||||
"integrity": "sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw=="
|
||||
},
|
||||
"@protobufjs/aspromise@1.1.2": {
|
||||
"integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ=="
|
||||
},
|
||||
"@protobufjs/base64@1.1.2": {
|
||||
"integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg=="
|
||||
},
|
||||
"@protobufjs/codegen@2.0.4": {
|
||||
"integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg=="
|
||||
},
|
||||
"@protobufjs/eventemitter@1.1.0": {
|
||||
"integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q=="
|
||||
},
|
||||
"@protobufjs/fetch@1.1.0": {
|
||||
"integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==",
|
||||
"dependencies": [
|
||||
"@protobufjs/aspromise",
|
||||
"@protobufjs/inquire"
|
||||
]
|
||||
},
|
||||
"@protobufjs/float@1.0.2": {
|
||||
"integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ=="
|
||||
},
|
||||
"@protobufjs/inquire@1.1.0": {
|
||||
"integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q=="
|
||||
},
|
||||
"@protobufjs/path@1.1.2": {
|
||||
"integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA=="
|
||||
},
|
||||
"@protobufjs/pool@1.1.0": {
|
||||
"integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw=="
|
||||
},
|
||||
"@protobufjs/utf8@1.1.0": {
|
||||
"integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw=="
|
||||
},
|
||||
"@types/node@25.2.3": {
|
||||
"integrity": "sha512-m0jEgYlYz+mDJZ2+F4v8D1AyQb+QzsNqRuI7xg1VQX/KlKS0qT9r1Mo16yo5F/MtifXFgaofIFsdFMox2SxIbQ==",
|
||||
"dependencies": [
|
||||
"undici-types"
|
||||
]
|
||||
},
|
||||
"ansi-regex@5.0.1": {
|
||||
"integrity": "sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ=="
|
||||
},
|
||||
"ansi-styles@4.3.0": {
|
||||
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
|
||||
"dependencies": [
|
||||
"color-convert"
|
||||
]
|
||||
},
|
||||
"cliui@8.0.1": {
|
||||
"integrity": "sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==",
|
||||
"dependencies": [
|
||||
"string-width",
|
||||
"strip-ansi",
|
||||
"wrap-ansi"
|
||||
]
|
||||
},
|
||||
"color-convert@2.0.1": {
|
||||
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
|
||||
"dependencies": [
|
||||
"color-name"
|
||||
]
|
||||
},
|
||||
"color-name@1.1.4": {
|
||||
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA=="
|
||||
},
|
||||
"emoji-regex@8.0.0": {
|
||||
"integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A=="
|
||||
},
|
||||
"escalade@3.2.0": {
|
||||
"integrity": "sha512-WUj2qlxaQtO4g6Pq5c29GTcWGDyd8itL8zTlipgECz3JesAiiOKotd8JU6otB3PACgG6xkJUyVhboMS+bje/jA=="
|
||||
},
|
||||
"get-caller-file@2.0.5": {
|
||||
"integrity": "sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg=="
|
||||
},
|
||||
"is-fullwidth-code-point@3.0.0": {
|
||||
"integrity": "sha512-zymm5+u+sCsSWyD9qNaejV3DFvhCKclKdizYaJUuHA83RLjb7nSuGnddCHGv0hk+KY7BMAlsWeK4Ueg6EV6XQg=="
|
||||
},
|
||||
"lodash.camelcase@4.3.0": {
|
||||
"integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA=="
|
||||
},
|
||||
"long@5.3.2": {
|
||||
"integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA=="
|
||||
},
|
||||
"protobufjs@7.5.4": {
|
||||
"integrity": "sha512-CvexbZtbov6jW2eXAvLukXjXUW1TzFaivC46BpWc/3BpcCysb5Vffu+B3XHMm8lVEuy2Mm4XGex8hBSg1yapPg==",
|
||||
"dependencies": [
|
||||
"@protobufjs/aspromise",
|
||||
"@protobufjs/base64",
|
||||
"@protobufjs/codegen",
|
||||
"@protobufjs/eventemitter",
|
||||
"@protobufjs/fetch",
|
||||
"@protobufjs/float",
|
||||
"@protobufjs/inquire",
|
||||
"@protobufjs/path",
|
||||
"@protobufjs/pool",
|
||||
"@protobufjs/utf8",
|
||||
"@types/node",
|
||||
"long"
|
||||
],
|
||||
"scripts": true
|
||||
},
|
||||
"require-directory@2.1.1": {
|
||||
"integrity": "sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q=="
|
||||
},
|
||||
"string-width@4.2.3": {
|
||||
"integrity": "sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==",
|
||||
"dependencies": [
|
||||
"emoji-regex",
|
||||
"is-fullwidth-code-point",
|
||||
"strip-ansi"
|
||||
]
|
||||
},
|
||||
"strip-ansi@6.0.1": {
|
||||
"integrity": "sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==",
|
||||
"dependencies": [
|
||||
"ansi-regex"
|
||||
]
|
||||
},
|
||||
"undici-types@7.16.0": {
|
||||
"integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw=="
|
||||
},
|
||||
"wrap-ansi@7.0.0": {
|
||||
"integrity": "sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==",
|
||||
"dependencies": [
|
||||
"ansi-styles",
|
||||
"string-width",
|
||||
"strip-ansi"
|
||||
]
|
||||
},
|
||||
"y18n@5.0.8": {
|
||||
"integrity": "sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA=="
|
||||
},
|
||||
"yargs-parser@21.1.1": {
|
||||
"integrity": "sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw=="
|
||||
},
|
||||
"yargs@17.7.2": {
|
||||
"integrity": "sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==",
|
||||
"dependencies": [
|
||||
"cliui",
|
||||
"escalade",
|
||||
"get-caller-file",
|
||||
"require-directory",
|
||||
"string-width",
|
||||
"y18n",
|
||||
"yargs-parser"
|
||||
]
|
||||
}
|
||||
},
|
||||
"workspace": {
|
||||
"dependencies": [
|
||||
"npm:@grpc/grpc-js@^1.12.0",
|
||||
"npm:@grpc/proto-loader@0.7"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
@ -2,19 +2,90 @@
|
|||
// Connects to CoreGO via gRPC over Unix socket.
|
||||
// Implements DenoService for module lifecycle management.
|
||||
|
||||
const socketPath = Deno.env.get("CORE_SOCKET");
|
||||
if (!socketPath) {
|
||||
// Must be first import — patches http2 before @grpc/grpc-js loads.
|
||||
import "./polyfill.ts";
|
||||
|
||||
import { createCoreClient, type CoreClient } from "./client.ts";
|
||||
import { startDenoServer, type DenoServer } from "./server.ts";
|
||||
import { ModuleRegistry } from "./modules.ts";
|
||||
|
||||
// Read required environment variables
|
||||
const coreSocket = Deno.env.get("CORE_SOCKET");
|
||||
if (!coreSocket) {
|
||||
console.error("FATAL: CORE_SOCKET environment variable not set");
|
||||
Deno.exit(1);
|
||||
}
|
||||
|
||||
console.error(`CoreDeno: connecting to ${socketPath}`);
|
||||
const denoSocket = Deno.env.get("DENO_SOCKET");
|
||||
if (!denoSocket) {
|
||||
console.error("FATAL: DENO_SOCKET environment variable not set");
|
||||
Deno.exit(1);
|
||||
}
|
||||
|
||||
// Tier 1: signal readiness and stay alive.
|
||||
// Tier 2 adds the gRPC client and DenoService implementation.
|
||||
console.error(`CoreDeno: CORE_SOCKET=${coreSocket}`);
|
||||
console.error(`CoreDeno: DENO_SOCKET=${denoSocket}`);
|
||||
|
||||
// 1. Create module registry
|
||||
const registry = new ModuleRegistry();
|
||||
|
||||
// 2. Start DenoService server (Go calls us here via JSON-RPC over Unix socket)
|
||||
let denoServer: DenoServer;
|
||||
try {
|
||||
denoServer = await startDenoServer(denoSocket, registry);
|
||||
console.error("CoreDeno: DenoService server started");
|
||||
} catch (err) {
|
||||
console.error(`FATAL: failed to start DenoService server: ${err}`);
|
||||
Deno.exit(1);
|
||||
}
|
||||
|
||||
// 3. Connect to CoreService (we call Go here) with retry
|
||||
let coreClient: CoreClient;
|
||||
{
|
||||
coreClient = createCoreClient(coreSocket);
|
||||
const maxRetries = 20;
|
||||
let connected = false;
|
||||
let lastErr: unknown;
|
||||
for (let i = 0; i < maxRetries; i++) {
|
||||
try {
|
||||
const timeoutCall = <T>(p: Promise<T>): Promise<T> =>
|
||||
Promise.race([
|
||||
p,
|
||||
new Promise<T>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("call timeout")), 2000),
|
||||
),
|
||||
]);
|
||||
await timeoutCall(
|
||||
coreClient.storeSet("_coredeno", "status", "connected"),
|
||||
);
|
||||
const resp = await timeoutCall(
|
||||
coreClient.storeGet("_coredeno", "status"),
|
||||
);
|
||||
if (resp.found && resp.value === "connected") {
|
||||
connected = true;
|
||||
break;
|
||||
}
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
if (i < 3 || i === 9 || i === 19) {
|
||||
console.error(`CoreDeno: retry ${i}: ${err}`);
|
||||
}
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 250));
|
||||
}
|
||||
if (!connected) {
|
||||
console.error(
|
||||
`FATAL: failed to connect to CoreService after retries, last error: ${lastErr}`,
|
||||
);
|
||||
denoServer.close();
|
||||
Deno.exit(1);
|
||||
}
|
||||
console.error("CoreDeno: CoreService client connected");
|
||||
}
|
||||
|
||||
// 4. Signal readiness
|
||||
console.error("CoreDeno: ready");
|
||||
|
||||
// Keep alive until parent sends SIGTERM
|
||||
// 5. Keep alive until SIGTERM
|
||||
const ac = new AbortController();
|
||||
Deno.addSignalListener("SIGTERM", () => {
|
||||
console.error("CoreDeno: shutting down");
|
||||
|
|
@ -26,5 +97,7 @@ try {
|
|||
ac.signal.addEventListener("abort", () => reject(new Error("shutdown")));
|
||||
});
|
||||
} catch {
|
||||
// Clean exit on SIGTERM
|
||||
// Clean shutdown
|
||||
coreClient.close();
|
||||
denoServer.close();
|
||||
}
|
||||
|
|
|
|||
50
pkg/coredeno/runtime/modules.ts
Normal file
50
pkg/coredeno/runtime/modules.ts
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
// Module registry — tracks loaded modules and their lifecycle status.
|
||||
// Tier 2: status tracking only. Tier 3 adds real Deno worker isolates.
|
||||
|
||||
export type ModuleStatus = "UNKNOWN" | "LOADING" | "RUNNING" | "STOPPED" | "ERRORED";
|
||||
|
||||
// Status enum values matching the proto definition.
|
||||
export const StatusEnum: Record<ModuleStatus, number> = {
|
||||
UNKNOWN: 0,
|
||||
LOADING: 1,
|
||||
RUNNING: 2,
|
||||
STOPPED: 3,
|
||||
ERRORED: 4,
|
||||
};
|
||||
|
||||
export interface Module {
|
||||
code: string;
|
||||
entryPoint: string;
|
||||
permissions: string[];
|
||||
status: ModuleStatus;
|
||||
}
|
||||
|
||||
export class ModuleRegistry {
|
||||
private modules = new Map<string, Module>();
|
||||
|
||||
load(code: string, entryPoint: string, permissions: string[]): void {
|
||||
this.modules.set(code, {
|
||||
code,
|
||||
entryPoint,
|
||||
permissions,
|
||||
status: "RUNNING",
|
||||
});
|
||||
console.error(`CoreDeno: module loaded: ${code}`);
|
||||
}
|
||||
|
||||
unload(code: string): boolean {
|
||||
const mod = this.modules.get(code);
|
||||
if (!mod) return false;
|
||||
mod.status = "STOPPED";
|
||||
console.error(`CoreDeno: module unloaded: ${code}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
status(code: string): ModuleStatus {
|
||||
return this.modules.get(code)?.status ?? "UNKNOWN";
|
||||
}
|
||||
|
||||
list(): Module[] {
|
||||
return Array.from(this.modules.values());
|
||||
}
|
||||
}
|
||||
94
pkg/coredeno/runtime/polyfill.ts
Normal file
94
pkg/coredeno/runtime/polyfill.ts
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
// Deno http2 + grpc-js polyfill — must be imported BEFORE @grpc/grpc-js.
|
||||
//
|
||||
// Two issues with Deno 2.x node compat:
|
||||
// 1. http2.getDefaultSettings throws "Not implemented"
|
||||
// 2. grpc-js's createConnection returns a socket that reports readyState="open"
|
||||
// but never emits "connect", causing http2 sessions to hang forever.
|
||||
// Fix: wrap createConnection to emit "connect" on next tick for open sockets.
|
||||
|
||||
import http2 from "node:http2";
|
||||
|
||||
// Fix 1: getDefaultSettings stub
|
||||
(http2 as any).getDefaultSettings = () => ({
|
||||
headerTableSize: 4096,
|
||||
enablePush: true,
|
||||
initialWindowSize: 65535,
|
||||
maxFrameSize: 16384,
|
||||
maxConcurrentStreams: 0xffffffff,
|
||||
maxHeaderListSize: 65535,
|
||||
maxHeaderSize: 65535,
|
||||
enableConnectProtocol: false,
|
||||
});
|
||||
|
||||
// Fix 2: grpc-js (transport.js line 536) passes an already-connected socket
|
||||
// to http2.connect via createConnection. Deno's http2 never completes the
|
||||
// HTTP/2 handshake because it expects a "connect" event from the socket,
|
||||
// which already fired. Emitting "connect" again causes "Busy: Unix socket
|
||||
// is currently in use" in Deno's internal http2.
|
||||
//
|
||||
// Workaround: track Unix socket paths via net.connect intercept, then in
|
||||
// createConnection, return a FRESH socket. Keep the original socket alive
|
||||
// (grpc-js has close listeners on it) but unused for data.
|
||||
import net from "node:net";
|
||||
|
||||
const socketPathMap = new WeakMap<net.Socket, string>();
|
||||
const origNetConnect = net.connect;
|
||||
(net as any).connect = function (...args: any[]) {
|
||||
const sock = origNetConnect.apply(this, args as any);
|
||||
if (args[0] && typeof args[0] === "object" && args[0].path) {
|
||||
socketPathMap.set(sock, args[0].path);
|
||||
}
|
||||
return sock;
|
||||
};
|
||||
|
||||
// Fix 3: Deno's http2 client never fires "remoteSettings" event, which
|
||||
// grpc-js waits for before marking the transport as READY.
|
||||
// Workaround: emit "remoteSettings" after "connect" with reasonable defaults.
|
||||
const origConnect = http2.connect;
|
||||
(http2 as any).connect = function (
|
||||
authority: any,
|
||||
options: any,
|
||||
...rest: any[]
|
||||
) {
|
||||
// For Unix sockets: replace pre-connected socket with fresh one
|
||||
if (options?.createConnection) {
|
||||
const origCC = options.createConnection;
|
||||
options = {
|
||||
...options,
|
||||
createConnection(...ccArgs: any[]) {
|
||||
const origSock = origCC.apply(this, ccArgs);
|
||||
const unixPath = socketPathMap.get(origSock);
|
||||
if (
|
||||
unixPath &&
|
||||
!origSock.connecting &&
|
||||
origSock.readyState === "open"
|
||||
) {
|
||||
const freshSock = net.connect({ path: unixPath });
|
||||
freshSock.on("close", () => origSock.destroy());
|
||||
return freshSock;
|
||||
}
|
||||
return origSock;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const session = origConnect.call(this, authority, options, ...rest);
|
||||
|
||||
// Emit remoteSettings after connect — Deno's http2 doesn't emit it
|
||||
session.once("connect", () => {
|
||||
if (!session.destroyed && !session.closed) {
|
||||
const settings = {
|
||||
headerTableSize: 4096,
|
||||
enablePush: false,
|
||||
initialWindowSize: 65535,
|
||||
maxFrameSize: 16384,
|
||||
maxConcurrentStreams: 100,
|
||||
maxHeaderListSize: 8192,
|
||||
maxHeaderSize: 8192,
|
||||
};
|
||||
process.nextTick(() => session.emit("remoteSettings", settings));
|
||||
}
|
||||
});
|
||||
|
||||
return session;
|
||||
};
|
||||
124
pkg/coredeno/runtime/server.ts
Normal file
124
pkg/coredeno/runtime/server.ts
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
// DenoService JSON-RPC server — Go calls Deno for module lifecycle management.
|
||||
// Uses length-prefixed JSON over raw Unix socket (Deno's http2 server is broken).
|
||||
// Protocol: 4-byte big-endian length + JSON payload, newline-delimited.
|
||||
|
||||
import { ModuleRegistry } from "./modules.ts";
|
||||
|
||||
export interface DenoServer {
|
||||
close(): void;
|
||||
}
|
||||
|
||||
export async function startDenoServer(
|
||||
socketPath: string,
|
||||
registry: ModuleRegistry,
|
||||
): Promise<DenoServer> {
|
||||
// Remove stale socket
|
||||
try {
|
||||
Deno.removeSync(socketPath);
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
|
||||
const listener = Deno.listen({ transport: "unix", path: socketPath });
|
||||
|
||||
const handleConnection = async (conn: Deno.UnixConn) => {
|
||||
const reader = conn.readable.getReader();
|
||||
const writer = conn.writable.getWriter();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Process complete lines (newline-delimited JSON)
|
||||
let newlineIdx: number;
|
||||
while ((newlineIdx = buffer.indexOf("\n")) !== -1) {
|
||||
const line = buffer.slice(0, newlineIdx);
|
||||
buffer = buffer.slice(newlineIdx + 1);
|
||||
|
||||
if (!line.trim()) continue;
|
||||
|
||||
try {
|
||||
const req = JSON.parse(line);
|
||||
const resp = dispatch(req, registry);
|
||||
await writer.write(
|
||||
new TextEncoder().encode(JSON.stringify(resp) + "\n"),
|
||||
);
|
||||
} catch (err) {
|
||||
const errResp = {
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
await writer.write(
|
||||
new TextEncoder().encode(JSON.stringify(errResp) + "\n"),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Connection closed or error — expected during shutdown
|
||||
} finally {
|
||||
try {
|
||||
writer.close();
|
||||
} catch {
|
||||
/* already closed */
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Accept connections in background
|
||||
const abortController = new AbortController();
|
||||
(async () => {
|
||||
try {
|
||||
for await (const conn of listener) {
|
||||
if (abortController.signal.aborted) break;
|
||||
handleConnection(conn);
|
||||
}
|
||||
} catch {
|
||||
// Listener closed
|
||||
}
|
||||
})();
|
||||
|
||||
return {
|
||||
close() {
|
||||
abortController.abort();
|
||||
listener.close();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
interface RPCRequest {
|
||||
method: string;
|
||||
code?: string;
|
||||
entry_point?: string;
|
||||
permissions?: string[];
|
||||
process_id?: string;
|
||||
}
|
||||
|
||||
function dispatch(
|
||||
req: RPCRequest,
|
||||
registry: ModuleRegistry,
|
||||
): Record<string, unknown> {
|
||||
switch (req.method) {
|
||||
case "LoadModule": {
|
||||
registry.load(
|
||||
req.code ?? "",
|
||||
req.entry_point ?? "",
|
||||
req.permissions ?? [],
|
||||
);
|
||||
return { ok: true, error: "" };
|
||||
}
|
||||
case "UnloadModule": {
|
||||
const ok = registry.unload(req.code ?? "");
|
||||
return { ok };
|
||||
}
|
||||
case "ModuleStatus": {
|
||||
return { code: req.code, status: registry.status(req.code ?? "") };
|
||||
}
|
||||
default:
|
||||
return { error: `unknown method: ${req.method}` };
|
||||
}
|
||||
}
|
||||
|
|
@ -8,8 +8,27 @@ import (
|
|||
"forge.lthn.ai/core/go/pkg/io"
|
||||
"forge.lthn.ai/core/go/pkg/manifest"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ProcessRunner abstracts process management for the gRPC server.
|
||||
// Satisfied by *process.Service.
|
||||
type ProcessRunner interface {
|
||||
Start(ctx context.Context, command string, args ...string) (ProcessHandle, error)
|
||||
Kill(id string) error
|
||||
}
|
||||
|
||||
// ProcessHandle is returned by ProcessRunner.Start.
|
||||
type ProcessHandle interface {
|
||||
Info() ProcessInfo
|
||||
}
|
||||
|
||||
// ProcessInfo is the subset of process info the server needs.
|
||||
type ProcessInfo struct {
|
||||
ID string
|
||||
}
|
||||
|
||||
// Server implements the CoreService gRPC interface with permission gating.
|
||||
// Every I/O request is checked against the calling module's declared permissions.
|
||||
type Server struct {
|
||||
|
|
@ -17,6 +36,7 @@ type Server struct {
|
|||
medium io.Medium
|
||||
store *store.Store
|
||||
manifests map[string]*manifest.Manifest
|
||||
processes ProcessRunner
|
||||
}
|
||||
|
||||
// NewServer creates a CoreService server backed by the given Medium and Store.
|
||||
|
|
@ -129,3 +149,38 @@ func (s *Server) StoreSet(_ context.Context, req *pb.StoreSetRequest) (*pb.Store
|
|||
}
|
||||
return &pb.StoreSetResponse{Ok: true}, nil
|
||||
}
|
||||
|
||||
// SetProcessRunner sets the process runner for ProcessStart/ProcessStop.
|
||||
func (s *Server) SetProcessRunner(pr ProcessRunner) {
|
||||
s.processes = pr
|
||||
}
|
||||
|
||||
// ProcessStart implements CoreService.ProcessStart with permission gating.
|
||||
func (s *Server) ProcessStart(ctx context.Context, req *pb.ProcessStartRequest) (*pb.ProcessStartResponse, error) {
|
||||
if s.processes == nil {
|
||||
return nil, status.Error(codes.Unimplemented, "process service not available")
|
||||
}
|
||||
m, err := s.getManifest(req.ModuleCode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !CheckRun(req.Command, m.Permissions.Run) {
|
||||
return nil, fmt.Errorf("permission denied: %s cannot run %s", req.ModuleCode, req.Command)
|
||||
}
|
||||
proc, err := s.processes.Start(ctx, req.Command, req.Args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("process start: %w", err)
|
||||
}
|
||||
return &pb.ProcessStartResponse{ProcessId: proc.Info().ID}, nil
|
||||
}
|
||||
|
||||
// ProcessStop implements CoreService.ProcessStop.
|
||||
func (s *Server) ProcessStop(_ context.Context, req *pb.ProcessStopRequest) (*pb.ProcessStopResponse, error) {
|
||||
if s.processes == nil {
|
||||
return nil, status.Error(codes.Unimplemented, "process service not available")
|
||||
}
|
||||
if err := s.processes.Kill(req.ProcessId); err != nil {
|
||||
return nil, fmt.Errorf("process stop: %w", err)
|
||||
}
|
||||
return &pb.ProcessStopResponse{Ok: true}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,16 +2,48 @@ package coredeno
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
"forge.lthn.ai/core/go/pkg/io"
|
||||
"forge.lthn.ai/core/go/pkg/manifest"
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// mockProcessRunner implements ProcessRunner for testing.
|
||||
type mockProcessRunner struct {
|
||||
started map[string]bool
|
||||
nextID int
|
||||
}
|
||||
|
||||
func newMockProcessRunner() *mockProcessRunner {
|
||||
return &mockProcessRunner{started: make(map[string]bool)}
|
||||
}
|
||||
|
||||
func (m *mockProcessRunner) Start(_ context.Context, command string, args ...string) (ProcessHandle, error) {
|
||||
m.nextID++
|
||||
id := fmt.Sprintf("proc-%d", m.nextID)
|
||||
m.started[id] = true
|
||||
return &mockProcessHandle{id: id}, nil
|
||||
}
|
||||
|
||||
func (m *mockProcessRunner) Kill(id string) error {
|
||||
if !m.started[id] {
|
||||
return fmt.Errorf("process not found: %s", id)
|
||||
}
|
||||
delete(m.started, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockProcessHandle struct{ id string }
|
||||
|
||||
func (h *mockProcessHandle) Info() ProcessInfo { return ProcessInfo{ID: h.id} }
|
||||
|
||||
func newTestServer(t *testing.T) *Server {
|
||||
t.Helper()
|
||||
medium := io.NewMockMedium()
|
||||
|
|
@ -95,3 +127,74 @@ func TestStoreGet_Good_NotFound(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
assert.False(t, resp.Found)
|
||||
}
|
||||
|
||||
func newTestServerWithProcess(t *testing.T) (*Server, *mockProcessRunner) {
|
||||
t.Helper()
|
||||
srv := newTestServer(t)
|
||||
srv.RegisterModule(&manifest.Manifest{
|
||||
Code: "runner-mod",
|
||||
Permissions: manifest.Permissions{
|
||||
Run: []string{"echo", "ls"},
|
||||
},
|
||||
})
|
||||
pr := newMockProcessRunner()
|
||||
srv.SetProcessRunner(pr)
|
||||
return srv, pr
|
||||
}
|
||||
|
||||
func TestProcessStart_Good(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
resp, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "echo", Args: []string{"hello"}, ModuleCode: "runner-mod",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, resp.ProcessId)
|
||||
}
|
||||
|
||||
func TestProcessStart_Bad_PermissionDenied(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
_, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "rm", Args: []string{"-rf", "/"}, ModuleCode: "runner-mod",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "permission denied")
|
||||
}
|
||||
|
||||
func TestProcessStart_Bad_NoProcessService(t *testing.T) {
|
||||
srv := newTestServer(t)
|
||||
srv.RegisterModule(&manifest.Manifest{
|
||||
Code: "no-proc-mod",
|
||||
Permissions: manifest.Permissions{Run: []string{"echo"}},
|
||||
})
|
||||
_, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "echo", ModuleCode: "no-proc-mod",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
st, ok := status.FromError(err)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, codes.Unimplemented, st.Code())
|
||||
}
|
||||
|
||||
func TestProcessStop_Good(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
// Start a process first
|
||||
startResp, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "echo", ModuleCode: "runner-mod",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop it
|
||||
resp, err := srv.ProcessStop(context.Background(), &pb.ProcessStopRequest{
|
||||
ProcessId: startResp.ProcessId,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, resp.Ok)
|
||||
}
|
||||
|
||||
func TestProcessStop_Bad_NotFound(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
_, err := srv.ProcessStop(context.Background(), &pb.ProcessStopRequest{
|
||||
ProcessId: "nonexistent",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@ package coredeno
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
core "forge.lthn.ai/core/go/pkg/framework/core"
|
||||
"forge.lthn.ai/core/go/pkg/io"
|
||||
|
|
@ -23,6 +25,7 @@ type Service struct {
|
|||
store *store.Store
|
||||
grpcCancel context.CancelFunc
|
||||
grpcDone chan error
|
||||
denoClient *DenoClient
|
||||
}
|
||||
|
||||
// NewServiceFactory returns a factory function for framework registration via WithService.
|
||||
|
|
@ -91,9 +94,26 @@ func (s *Service) OnStartup(ctx context.Context) error {
|
|||
|
||||
// 6. Start sidecar (if args provided)
|
||||
if len(opts.SidecarArgs) > 0 {
|
||||
// Wait for core socket so sidecar can connect to our gRPC server
|
||||
if err := waitForSocket(ctx, opts.SocketPath, 5*time.Second); err != nil {
|
||||
return fmt.Errorf("coredeno: core socket: %w", err)
|
||||
}
|
||||
|
||||
if err := s.sidecar.Start(ctx, opts.SidecarArgs...); err != nil {
|
||||
return fmt.Errorf("coredeno: sidecar: %w", err)
|
||||
}
|
||||
|
||||
// 7. Wait for Deno's server and connect as client
|
||||
if opts.DenoSocketPath != "" {
|
||||
if err := waitForSocket(ctx, opts.DenoSocketPath, 10*time.Second); err != nil {
|
||||
return fmt.Errorf("coredeno: deno socket: %w", err)
|
||||
}
|
||||
dc, err := DialDeno(opts.DenoSocketPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("coredeno: deno client: %w", err)
|
||||
}
|
||||
s.denoClient = dc
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
@ -101,7 +121,12 @@ func (s *Service) OnStartup(ctx context.Context) error {
|
|||
|
||||
// OnShutdown stops the CoreDeno subsystem. Called by the framework on app shutdown.
|
||||
func (s *Service) OnShutdown(_ context.Context) error {
|
||||
// Stop sidecar first
|
||||
// Close Deno client connection
|
||||
if s.denoClient != nil {
|
||||
s.denoClient.Close()
|
||||
}
|
||||
|
||||
// Stop sidecar
|
||||
_ = s.sidecar.Stop()
|
||||
|
||||
// Stop gRPC listener
|
||||
|
|
@ -127,3 +152,27 @@ func (s *Service) Sidecar() *Sidecar {
|
|||
func (s *Service) GRPCServer() *Server {
|
||||
return s.grpcServer
|
||||
}
|
||||
|
||||
// DenoClient returns the DenoService client for calling the Deno sidecar.
|
||||
// Returns nil if the sidecar was not started or has no DenoSocketPath.
|
||||
func (s *Service) DenoClient() *DenoClient {
|
||||
return s.denoClient
|
||||
}
|
||||
|
||||
// waitForSocket polls until a Unix socket file appears or the context/timeout expires.
|
||||
func waitForSocket(ctx context.Context, path string, timeout time.Duration) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
for {
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
return nil
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return fmt.Errorf("timeout waiting for socket %s", path)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue