go/pkg/coredeno/runtime/server.ts
Claude ad6a466459
feat(coredeno): Tier 3 Worker isolation — sandboxed module loading with I/O bridge
Each module now runs in a real Deno Worker with per-module permission
sandboxing. The I/O bridge relays Worker postMessage calls through the
parent to CoreService gRPC, so modules can access store, files, and
processes without direct network/filesystem access.

- Worker bootstrap (worker-entry.ts): sets up RPC bridge, dynamically
  imports module, calls init(core) with typed I/O object
- ModuleRegistry rewritten: creates Workers with Deno permission
  constructor, handles LOADING → RUNNING → STOPPED lifecycle
- Structured ModulePermissions (read/write/net/run) replaces flat
  string array in Go→Deno JSON-RPC
- I/O bridge: Worker postMessage → parent dispatchRPC → CoreClient
  gRPC → response relayed back to Worker
- Test module proves end-to-end: Worker calls core.storeSet() →
  Go verifies value in store

40 unit tests + 3 integration tests (Tier 1 boot + Tier 2 bidir + Tier 3 Worker).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 00:48:16 +00:00

124 lines
3.1 KiB
TypeScript

// DenoService JSON-RPC server — Go calls Deno for module lifecycle management.
// Uses length-prefixed JSON over raw Unix socket (Deno's http2 server is broken).
// Protocol: 4-byte big-endian length + JSON payload, newline-delimited.
import { ModuleRegistry } from "./modules.ts";
export interface DenoServer {
close(): void;
}
export async function startDenoServer(
socketPath: string,
registry: ModuleRegistry,
): Promise<DenoServer> {
// Remove stale socket
try {
Deno.removeSync(socketPath);
} catch {
// ignore
}
const listener = Deno.listen({ transport: "unix", path: socketPath });
const handleConnection = async (conn: Deno.UnixConn) => {
const reader = conn.readable.getReader();
const writer = conn.writable.getWriter();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Process complete lines (newline-delimited JSON)
let newlineIdx: number;
while ((newlineIdx = buffer.indexOf("\n")) !== -1) {
const line = buffer.slice(0, newlineIdx);
buffer = buffer.slice(newlineIdx + 1);
if (!line.trim()) continue;
try {
const req = JSON.parse(line);
const resp = dispatch(req, registry);
await writer.write(
new TextEncoder().encode(JSON.stringify(resp) + "\n"),
);
} catch (err) {
const errResp = {
error: err instanceof Error ? err.message : String(err),
};
await writer.write(
new TextEncoder().encode(JSON.stringify(errResp) + "\n"),
);
}
}
}
} catch {
// Connection closed or error — expected during shutdown
} finally {
try {
writer.close();
} catch {
/* already closed */
}
}
};
// Accept connections in background
const abortController = new AbortController();
(async () => {
try {
for await (const conn of listener) {
if (abortController.signal.aborted) break;
handleConnection(conn);
}
} catch {
// Listener closed
}
})();
return {
close() {
abortController.abort();
listener.close();
},
};
}
interface RPCRequest {
method: string;
code?: string;
entry_point?: string;
permissions?: { read?: string[]; write?: string[]; net?: string[]; run?: string[] };
process_id?: string;
}
function dispatch(
req: RPCRequest,
registry: ModuleRegistry,
): Record<string, unknown> {
switch (req.method) {
case "LoadModule": {
registry.load(
req.code ?? "",
req.entry_point ?? "",
req.permissions ?? {},
);
return { ok: true, error: "" };
}
case "UnloadModule": {
const ok = registry.unload(req.code ?? "");
return { ok };
}
case "ModuleStatus": {
return { code: req.code, status: registry.status(req.code ?? "") };
}
default:
return { error: `unknown method: ${req.method}` };
}
}