feat(coredeno): Tier 3 Worker isolation — sandboxed module loading with I/O bridge

Each module now runs in a real Deno Worker with per-module permission
sandboxing. The I/O bridge relays Worker postMessage calls through the
parent to CoreService gRPC, so modules can access store, files, and
processes without direct network/filesystem access.

- Worker bootstrap (worker-entry.ts): sets up RPC bridge, dynamically
  imports module, calls init(core) with typed I/O object
- ModuleRegistry rewritten: creates Workers with Deno permission
  constructor, handles LOADING → RUNNING → STOPPED lifecycle
- Structured ModulePermissions (read/write/net/run) replaces flat
  string array in Go→Deno JSON-RPC
- I/O bridge: Worker postMessage → parent dispatchRPC → CoreClient
  gRPC → response relayed back to Worker
- Test module proves end-to-end: Worker calls core.storeSet() →
  Go verifies value in store

40 unit tests + 3 integration tests (Tier 1 boot + Tier 2 bidir + Tier 3 Worker).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude 2026-02-18 00:48:16 +00:00
parent af98accc03
commit ad6a466459
No known key found for this signature in database
GPG key ID: AF404715446AEB41
8 changed files with 396 additions and 32 deletions

View file

@ -63,19 +63,27 @@ func (c *DenoClient) call(req map[string]any) (map[string]any, error) {
return resp, nil
}
// ModulePermissions declares per-module permission scopes for Deno Worker sandboxing.
type ModulePermissions struct {
Read []string `json:"read,omitempty"`
Write []string `json:"write,omitempty"`
Net []string `json:"net,omitempty"`
Run []string `json:"run,omitempty"`
}
// LoadModuleResponse holds the result of a LoadModule call.
type LoadModuleResponse struct {
Ok bool
Error string
}
// LoadModule tells Deno to load a module.
func (c *DenoClient) LoadModule(code, entryPoint string, permissions []string) (*LoadModuleResponse, error) {
// LoadModule tells Deno to load a module with the given permissions.
func (c *DenoClient) LoadModule(code, entryPoint string, perms ModulePermissions) (*LoadModuleResponse, error) {
resp, err := c.call(map[string]any{
"method": "LoadModule",
"code": code,
"entry_point": entryPoint,
"permissions": permissions,
"permissions": perms,
})
if err != nil {
return nil, err

View file

@ -44,6 +44,15 @@ func runtimeEntryPoint(t *testing.T) string {
return abs
}
// testModulePath returns the absolute path to runtime/testdata/test-module.ts.
func testModulePath(t *testing.T) string {
t.Helper()
abs, err := filepath.Abs("runtime/testdata/test-module.ts")
require.NoError(t, err)
require.FileExists(t, abs)
return abs
}
func TestIntegration_FullBoot_Good(t *testing.T) {
denoPath := findDeno(t)
@ -147,7 +156,7 @@ permissions:
DenoSocketPath: denoSockPath,
AppRoot: tmpDir,
StoreDBPath: ":memory:",
SidecarArgs: []string{"run", "-A", entryPoint},
SidecarArgs: []string{"run", "-A", "--unstable-worker-options", entryPoint},
}
c, err := core.New()
@ -181,12 +190,20 @@ permissions:
// Verify DenoClient is connected
require.NotNil(t, svc.DenoClient(), "DenoClient should be connected")
// Test Go → Deno: LoadModule
loadResp, err := svc.DenoClient().LoadModule("test-module", "/modules/test/main.ts", []string{"read", "net"})
// Test Go → Deno: LoadModule with real Worker
modPath := testModulePath(t)
loadResp, err := svc.DenoClient().LoadModule("test-module", modPath, ModulePermissions{
Read: []string{filepath.Dir(modPath) + "/"},
})
require.NoError(t, err)
assert.True(t, loadResp.Ok)
// Test Go → Deno: ModuleStatus
// Wait for module to finish loading (async Worker init)
require.Eventually(t, func() bool {
resp, err := svc.DenoClient().ModuleStatus("test-module")
return err == nil && (resp.Status == "RUNNING" || resp.Status == "ERRORED")
}, 5*time.Second, 50*time.Millisecond, "module should finish loading")
statusResp, err := svc.DenoClient().ModuleStatus("test-module")
require.NoError(t, err)
assert.Equal(t, "test-module", statusResp.Code)
@ -223,3 +240,102 @@ permissions:
assert.NoError(t, err)
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
}
func TestIntegration_Tier3_WorkerIsolation_Good(t *testing.T) {
denoPath := findDeno(t)
tmpDir := t.TempDir()
sockPath := filepath.Join(tmpDir, "core.sock")
denoSockPath := filepath.Join(tmpDir, "deno.sock")
// Write a manifest
coreDir := filepath.Join(tmpDir, ".core")
require.NoError(t, os.MkdirAll(coreDir, 0755))
require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(`
code: tier3-test
name: Tier 3 Test
version: "1.0"
permissions:
read: ["./data/"]
`), 0644))
entryPoint := runtimeEntryPoint(t)
modPath := testModulePath(t)
opts := Options{
DenoPath: denoPath,
SocketPath: sockPath,
DenoSocketPath: denoSockPath,
AppRoot: tmpDir,
StoreDBPath: ":memory:",
SidecarArgs: []string{"run", "-A", "--unstable-worker-options", entryPoint},
}
c, err := core.New()
require.NoError(t, err)
factory := NewServiceFactory(opts)
result, err := factory(c)
require.NoError(t, err)
svc := result.(*Service)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err = svc.OnStartup(ctx)
require.NoError(t, err)
// Verify both sockets appeared
require.Eventually(t, func() bool {
_, err := os.Stat(denoSockPath)
return err == nil
}, 10*time.Second, 50*time.Millisecond, "deno socket should appear")
require.NotNil(t, svc.DenoClient(), "DenoClient should be connected")
// Load a real module — it writes to store via I/O bridge
loadResp, err := svc.DenoClient().LoadModule("test-mod", modPath, ModulePermissions{
Read: []string{filepath.Dir(modPath) + "/"},
})
require.NoError(t, err)
assert.True(t, loadResp.Ok)
// Wait for module to reach RUNNING (Worker init + init() completes)
require.Eventually(t, func() bool {
resp, err := svc.DenoClient().ModuleStatus("test-mod")
return err == nil && resp.Status == "RUNNING"
}, 10*time.Second, 100*time.Millisecond, "module should be RUNNING")
// Verify the module wrote to the store via the I/O bridge
// Module calls: core.storeSet("test-module", "init", "ok")
conn, err := grpc.NewClient(
"unix://"+sockPath,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
defer conn.Close()
coreClient := pb.NewCoreServiceClient(conn)
// Poll for the store value — module init is async
require.Eventually(t, func() bool {
resp, err := coreClient.StoreGet(ctx, &pb.StoreGetRequest{
Group: "test-module", Key: "init",
})
return err == nil && resp.Found && resp.Value == "ok"
}, 5*time.Second, 100*time.Millisecond, "module should have written to store via I/O bridge")
// Unload and verify
unloadResp, err := svc.DenoClient().UnloadModule("test-mod")
require.NoError(t, err)
assert.True(t, unloadResp.Ok)
statusResp, err := svc.DenoClient().ModuleStatus("test-mod")
require.NoError(t, err)
assert.Equal(t, "STOPPED", statusResp.Status)
// Clean shutdown
err = svc.OnShutdown(context.Background())
assert.NoError(t, err)
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
}

View file

@ -3,5 +3,6 @@
"@grpc/grpc-js": "npm:@grpc/grpc-js@^1.12",
"@grpc/proto-loader": "npm:@grpc/proto-loader@^0.7"
},
"nodeModulesDir": "none"
"nodeModulesDir": "none",
"unstable": ["worker-options"]
}

View file

@ -82,10 +82,13 @@ let coreClient: CoreClient;
console.error("CoreDeno: CoreService client connected");
}
// 4. Signal readiness
// 4. Inject CoreClient into registry for I/O bridge
registry.setCoreClient(coreClient);
// 5. Signal readiness
console.error("CoreDeno: ready");
// 5. Keep alive until SIGTERM
// 6. Keep alive until SIGTERM
const ac = new AbortController();
Deno.addSignalListener("SIGTERM", () => {
console.error("CoreDeno: shutting down");

View file

@ -1,40 +1,189 @@
// Module registry — tracks loaded modules and their lifecycle status.
// Tier 2: status tracking only. Tier 3 adds real Deno worker isolates.
// Module registry — manages module lifecycle with Deno Worker isolation.
// Each module runs in its own Worker with per-module permission sandboxing.
// I/O bridge relays Worker postMessage calls to CoreService gRPC.
export type ModuleStatus = "UNKNOWN" | "LOADING" | "RUNNING" | "STOPPED" | "ERRORED";
import type { CoreClient } from "./client.ts";
// Status enum values matching the proto definition.
export const StatusEnum: Record<ModuleStatus, number> = {
UNKNOWN: 0,
LOADING: 1,
RUNNING: 2,
STOPPED: 3,
ERRORED: 4,
};
export type ModuleStatus =
| "UNKNOWN"
| "LOADING"
| "RUNNING"
| "STOPPED"
| "ERRORED";
export interface Module {
export interface ModulePermissions {
read?: string[];
write?: string[];
net?: string[];
run?: string[];
}
interface Module {
code: string;
entryPoint: string;
permissions: string[];
permissions: ModulePermissions;
status: ModuleStatus;
worker?: Worker;
}
export class ModuleRegistry {
private modules = new Map<string, Module>();
private coreClient: CoreClient | null = null;
private workerEntryUrl: string;
load(code: string, entryPoint: string, permissions: string[]): void {
this.modules.set(code, {
constructor() {
this.workerEntryUrl = new URL("./worker-entry.ts", import.meta.url).href;
}
setCoreClient(client: CoreClient): void {
this.coreClient = client;
}
load(code: string, entryPoint: string, permissions: ModulePermissions): void {
// Terminate existing worker if reloading
const existing = this.modules.get(code);
if (existing?.worker) {
existing.worker.terminate();
}
const mod: Module = {
code,
entryPoint,
permissions,
status: "RUNNING",
});
console.error(`CoreDeno: module loaded: ${code}`);
status: "LOADING",
};
this.modules.set(code, mod);
// Resolve entry point URL for the module
const moduleUrl =
entryPoint.startsWith("file://") || entryPoint.startsWith("http")
? entryPoint
: "file://" + entryPoint;
// Build read permissions: worker-entry.ts dir + module source + declared reads
const readPerms: string[] = [
new URL(".", import.meta.url).pathname,
];
// Add the module's directory so it can be dynamically imported
if (!entryPoint.startsWith("http")) {
const modPath = entryPoint.startsWith("file://")
? entryPoint.slice(7)
: entryPoint;
// Add the module file's directory
const lastSlash = modPath.lastIndexOf("/");
if (lastSlash > 0) readPerms.push(modPath.slice(0, lastSlash + 1));
else readPerms.push(modPath);
}
if (permissions.read) readPerms.push(...permissions.read);
// Create Worker with permission sandbox
const worker = new Worker(this.workerEntryUrl, {
type: "module",
name: code,
// deno-lint-ignore no-explicit-any
deno: {
permissions: {
read: readPerms,
write: permissions.write ?? [],
net: permissions.net ?? [],
run: permissions.run ?? [],
env: false,
sys: false,
ffi: false,
},
},
} as any);
mod.worker = worker;
// I/O bridge: relay Worker RPC to CoreClient
worker.onmessage = async (e: MessageEvent) => {
const msg = e.data;
if (msg.type === "ready") {
worker.postMessage({ type: "load", url: moduleUrl });
return;
}
if (msg.type === "loaded") {
mod.status = msg.ok ? "RUNNING" : "ERRORED";
if (msg.ok) {
console.error(`CoreDeno: module running: ${code}`);
} else {
console.error(`CoreDeno: module error: ${code}: ${msg.error}`);
}
return;
}
if (msg.type === "rpc" && this.coreClient) {
try {
const result = await this.dispatchRPC(
code,
msg.method,
msg.params,
);
worker.postMessage({ type: "rpc_response", id: msg.id, result });
} catch (err) {
worker.postMessage({
type: "rpc_response",
id: msg.id,
error: err instanceof Error ? err.message : String(err),
});
}
}
};
worker.onerror = (e: ErrorEvent) => {
mod.status = "ERRORED";
console.error(`CoreDeno: worker error: ${code}: ${e.message}`);
};
console.error(`CoreDeno: module loading: ${code}`);
}
private async dispatchRPC(
moduleCode: string,
method: string,
params: Record<string, unknown>,
): Promise<unknown> {
const c = this.coreClient!;
switch (method) {
case "StoreGet":
return c.storeGet(params.group as string, params.key as string);
case "StoreSet":
return c.storeSet(
params.group as string,
params.key as string,
params.value as string,
);
case "FileRead":
return c.fileRead(params.path as string, moduleCode);
case "FileWrite":
return c.fileWrite(
params.path as string,
params.content as string,
moduleCode,
);
case "ProcessStart":
return c.processStart(
params.command as string,
params.args as string[],
moduleCode,
);
case "ProcessStop":
return c.processStop(params.process_id as string);
default:
throw new Error(`unknown RPC method: ${method}`);
}
}
unload(code: string): boolean {
const mod = this.modules.get(code);
if (!mod) return false;
if (mod.worker) {
mod.worker.terminate();
mod.worker = undefined;
}
mod.status = "STOPPED";
console.error(`CoreDeno: module unloaded: ${code}`);
return true;
@ -44,7 +193,10 @@ export class ModuleRegistry {
return this.modules.get(code)?.status ?? "UNKNOWN";
}
list(): Module[] {
return Array.from(this.modules.values());
list(): Array<{ code: string; status: ModuleStatus }> {
return Array.from(this.modules.values()).map((m) => ({
code: m.code,
status: m.status,
}));
}
}

View file

@ -94,7 +94,7 @@ interface RPCRequest {
method: string;
code?: string;
entry_point?: string;
permissions?: string[];
permissions?: { read?: string[]; write?: string[]; net?: string[]; run?: string[] };
process_id?: string;
}
@ -107,7 +107,7 @@ function dispatch(
registry.load(
req.code ?? "",
req.entry_point ?? "",
req.permissions ?? [],
req.permissions ?? {},
);
return { ok: true, error: "" };
}

View file

@ -0,0 +1,5 @@
// Test module — writes to store via I/O bridge to prove Workers work.
// Called by integration tests.
export async function init(core: any) {
await core.storeSet("test-module", "init", "ok");
}

View file

@ -0,0 +1,79 @@
// Worker bootstrap — loaded as entry point for every module Worker.
// Sets up the I/O bridge (postMessage ↔ parent relay), then dynamically
// imports the module and calls its init(core) function.
//
// The parent (ModuleRegistry) injects module_code into all gRPC calls,
// so modules can't spoof their identity.
// I/O bridge: request/response correlation over postMessage
const pending = new Map<number, { resolve: Function; reject: Function }>();
let nextId = 0;
function rpc(
method: string,
params: Record<string, unknown>,
): Promise<unknown> {
return new Promise((resolve, reject) => {
const id = ++nextId;
pending.set(id, { resolve, reject });
self.postMessage({ type: "rpc", id, method, params });
});
}
// Typed core object passed to module's init() function.
// Each method maps to a CoreService gRPC call relayed through the parent.
const core = {
storeGet(group: string, key: string) {
return rpc("StoreGet", { group, key });
},
storeSet(group: string, key: string, value: string) {
return rpc("StoreSet", { group, key, value });
},
fileRead(path: string) {
return rpc("FileRead", { path });
},
fileWrite(path: string, content: string) {
return rpc("FileWrite", { path, content });
},
processStart(command: string, args: string[]) {
return rpc("ProcessStart", { command, args });
},
processStop(processId: string) {
return rpc("ProcessStop", { process_id: processId });
},
};
// Handle messages from parent: RPC responses and load commands
self.addEventListener("message", async (e: MessageEvent) => {
const msg = e.data;
if (msg.type === "rpc_response") {
const p = pending.get(msg.id);
if (p) {
pending.delete(msg.id);
if (msg.error) p.reject(new Error(msg.error));
else p.resolve(msg.result);
}
return;
}
if (msg.type === "load") {
try {
const mod = await import(msg.url);
if (typeof mod.init === "function") {
await mod.init(core);
}
self.postMessage({ type: "loaded", ok: true });
} catch (err) {
self.postMessage({
type: "loaded",
ok: false,
error: err instanceof Error ? err.message : String(err),
});
}
return;
}
});
// Signal ready — parent will respond with {type: "load", url: "..."}
self.postMessage({ type: "ready" });