diff --git a/pkg/coredeno/coredeno.go b/pkg/coredeno/coredeno.go index ee50ddb..2150377 100644 --- a/pkg/coredeno/coredeno.go +++ b/pkg/coredeno/coredeno.go @@ -13,12 +13,13 @@ import ( // Options configures the CoreDeno sidecar. type Options struct { - DenoPath string // path to deno binary (default: "deno") - SocketPath string // Unix socket path for gRPC - AppRoot string // app root directory (sandboxed I/O) - StoreDBPath string // SQLite DB path (default: AppRoot/.core/store.db) - PublicKey ed25519.PublicKey // ed25519 public key for manifest verification (optional) - SidecarArgs []string // args passed to the sidecar process + DenoPath string // path to deno binary (default: "deno") + SocketPath string // Unix socket path for Go's gRPC server (CoreService) + DenoSocketPath string // Unix socket path for Deno's gRPC server (DenoService) + AppRoot string // app root directory (sandboxed I/O) + StoreDBPath string // SQLite DB path (default: AppRoot/.core/store.db) + PublicKey ed25519.PublicKey // ed25519 public key for manifest verification (optional) + SidecarArgs []string // args passed to the sidecar process } // Permissions declares per-module Deno permission flags. @@ -74,6 +75,9 @@ func NewSidecar(opts Options) *Sidecar { if opts.SocketPath == "" { opts.SocketPath = DefaultSocketPath() } + if opts.DenoSocketPath == "" && opts.SocketPath != "" { + opts.DenoSocketPath = filepath.Join(filepath.Dir(opts.SocketPath), "deno.sock") + } if opts.StoreDBPath == "" && opts.AppRoot != "" { opts.StoreDBPath = filepath.Join(opts.AppRoot, ".core", "store.db") } diff --git a/pkg/coredeno/coredeno_test.go b/pkg/coredeno/coredeno_test.go index 1da31c8..a670dcc 100644 --- a/pkg/coredeno/coredeno_test.go +++ b/pkg/coredeno/coredeno_test.go @@ -80,3 +80,20 @@ func TestDefaultSocketPath_XDG(t *testing.T) { path := DefaultSocketPath() assert.Equal(t, "/run/user/1000/core/deno.sock", path) } + +func TestOptions_DenoSocketPath_Default_Good(t *testing.T) { + opts := Options{SocketPath: "/tmp/core/core.sock"} + sc := NewSidecar(opts) + assert.Equal(t, "/tmp/core/deno.sock", sc.opts.DenoSocketPath, + "DenoSocketPath should default to same dir as SocketPath with deno.sock") +} + +func TestOptions_DenoSocketPath_Explicit_Good(t *testing.T) { + opts := Options{ + SocketPath: "/tmp/core/core.sock", + DenoSocketPath: "/tmp/custom/deno.sock", + } + sc := NewSidecar(opts) + assert.Equal(t, "/tmp/custom/deno.sock", sc.opts.DenoSocketPath, + "Explicit DenoSocketPath should not be overridden") +} diff --git a/pkg/coredeno/denoclient.go b/pkg/coredeno/denoclient.go new file mode 100644 index 0000000..81b6952 --- /dev/null +++ b/pkg/coredeno/denoclient.go @@ -0,0 +1,127 @@ +package coredeno + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "sync" +) + +// DenoClient communicates with the Deno sidecar's JSON-RPC server over a Unix socket. +// Thread-safe: uses a mutex to serialize requests (one connection, request/response protocol). +type DenoClient struct { + mu sync.Mutex + conn net.Conn + reader *bufio.Reader +} + +// DialDeno connects to the Deno JSON-RPC server on the given Unix socket path. +func DialDeno(socketPath string) (*DenoClient, error) { + conn, err := net.Dial("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("deno dial: %w", err) + } + return &DenoClient{ + conn: conn, + reader: bufio.NewReader(conn), + }, nil +} + +// Close closes the underlying connection. +func (c *DenoClient) Close() error { + return c.conn.Close() +} + +func (c *DenoClient) call(req map[string]any) (map[string]any, error) { + c.mu.Lock() + defer c.mu.Unlock() + + data, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal: %w", err) + } + data = append(data, '\n') + + if _, err := c.conn.Write(data); err != nil { + return nil, fmt.Errorf("write: %w", err) + } + + line, err := c.reader.ReadBytes('\n') + if err != nil { + return nil, fmt.Errorf("read: %w", err) + } + + var resp map[string]any + if err := json.Unmarshal(line, &resp); err != nil { + return nil, fmt.Errorf("unmarshal: %w", err) + } + + if errMsg, ok := resp["error"].(string); ok && errMsg != "" { + return nil, fmt.Errorf("deno: %s", errMsg) + } + return resp, nil +} + +// LoadModuleResponse holds the result of a LoadModule call. +type LoadModuleResponse struct { + Ok bool + Error string +} + +// LoadModule tells Deno to load a module. +func (c *DenoClient) LoadModule(code, entryPoint string, permissions []string) (*LoadModuleResponse, error) { + resp, err := c.call(map[string]any{ + "method": "LoadModule", + "code": code, + "entry_point": entryPoint, + "permissions": permissions, + }) + if err != nil { + return nil, err + } + return &LoadModuleResponse{ + Ok: resp["ok"] == true, + Error: fmt.Sprint(resp["error"]), + }, nil +} + +// UnloadModuleResponse holds the result of an UnloadModule call. +type UnloadModuleResponse struct { + Ok bool +} + +// UnloadModule tells Deno to unload a module. +func (c *DenoClient) UnloadModule(code string) (*UnloadModuleResponse, error) { + resp, err := c.call(map[string]any{ + "method": "UnloadModule", + "code": code, + }) + if err != nil { + return nil, err + } + return &UnloadModuleResponse{ + Ok: resp["ok"] == true, + }, nil +} + +// ModuleStatusResponse holds the result of a ModuleStatus call. +type ModuleStatusResponse struct { + Code string + Status string +} + +// ModuleStatus queries the status of a module in the Deno runtime. +func (c *DenoClient) ModuleStatus(code string) (*ModuleStatusResponse, error) { + resp, err := c.call(map[string]any{ + "method": "ModuleStatus", + "code": code, + }) + if err != nil { + return nil, err + } + return &ModuleStatusResponse{ + Code: fmt.Sprint(resp["code"]), + Status: fmt.Sprint(resp["status"]), + }, nil +} diff --git a/pkg/coredeno/integration_test.go b/pkg/coredeno/integration_test.go index ce83fe0..b1c515d 100644 --- a/pkg/coredeno/integration_test.go +++ b/pkg/coredeno/integration_test.go @@ -18,16 +18,34 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func TestIntegration_FullBoot_Good(t *testing.T) { +// unused import guard +var _ = pb.NewCoreServiceClient + +func findDeno(t *testing.T) string { + t.Helper() denoPath, err := exec.LookPath("deno") if err != nil { - // Check ~/.deno/bin/deno home, _ := os.UserHomeDir() denoPath = filepath.Join(home, ".deno", "bin", "deno") if _, err := os.Stat(denoPath); err != nil { t.Skip("deno not installed") } } + return denoPath +} + +// runtimeEntryPoint returns the absolute path to runtime/main.ts. +func runtimeEntryPoint(t *testing.T) string { + t.Helper() + // We're in pkg/coredeno/ during test, runtime is a subdir + abs, err := filepath.Abs("runtime/main.ts") + require.NoError(t, err) + require.FileExists(t, abs) + return abs +} + +func TestIntegration_FullBoot_Good(t *testing.T) { + denoPath := findDeno(t) tmpDir := t.TempDir() sockPath := filepath.Join(tmpDir, "core.sock") @@ -43,21 +61,14 @@ permissions: read: ["./data/"] `), 0644)) - // Copy the runtime entry point - runtimeDir := filepath.Join(coreDir, "runtime") - require.NoError(t, os.MkdirAll(runtimeDir, 0755)) - src, err := os.ReadFile("runtime/main.ts") - require.NoError(t, err) - require.NoError(t, os.WriteFile(filepath.Join(runtimeDir, "main.ts"), src, 0644)) - - entryPoint := filepath.Join(runtimeDir, "main.ts") + entryPoint := runtimeEntryPoint(t) opts := Options{ DenoPath: denoPath, SocketPath: sockPath, AppRoot: tmpDir, StoreDBPath: ":memory:", - SidecarArgs: []string{"run", "--allow-env", entryPoint}, + SidecarArgs: []string{"run", "-A", entryPoint}, } c, err := core.New() @@ -68,7 +79,7 @@ permissions: require.NoError(t, err) svc := result.(*Service) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err = svc.OnStartup(ctx) @@ -108,3 +119,107 @@ permissions: assert.NoError(t, err) assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped") } + +func TestIntegration_Tier2_Bidirectional_Good(t *testing.T) { + denoPath := findDeno(t) + + tmpDir := t.TempDir() + sockPath := filepath.Join(tmpDir, "core.sock") + denoSockPath := filepath.Join(tmpDir, "deno.sock") + + // Write a manifest + coreDir := filepath.Join(tmpDir, ".core") + require.NoError(t, os.MkdirAll(coreDir, 0755)) + require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(` +code: tier2-test +name: Tier 2 Test +version: "1.0" +permissions: + read: ["./data/"] + run: ["echo"] +`), 0644)) + + entryPoint := runtimeEntryPoint(t) + + opts := Options{ + DenoPath: denoPath, + SocketPath: sockPath, + DenoSocketPath: denoSockPath, + AppRoot: tmpDir, + StoreDBPath: ":memory:", + SidecarArgs: []string{"run", "-A", entryPoint}, + } + + c, err := core.New() + require.NoError(t, err) + + factory := NewServiceFactory(opts) + result, err := factory(c) + require.NoError(t, err) + svc := result.(*Service) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err = svc.OnStartup(ctx) + require.NoError(t, err) + + // Verify both sockets appeared + require.Eventually(t, func() bool { + _, err := os.Stat(sockPath) + return err == nil + }, 10*time.Second, 50*time.Millisecond, "core socket should appear") + + require.Eventually(t, func() bool { + _, err := os.Stat(denoSockPath) + return err == nil + }, 10*time.Second, 50*time.Millisecond, "deno socket should appear") + + // Verify sidecar is running + assert.True(t, svc.sidecar.IsRunning(), "Deno sidecar should be running") + + // Verify DenoClient is connected + require.NotNil(t, svc.DenoClient(), "DenoClient should be connected") + + // Test Go → Deno: LoadModule + loadResp, err := svc.DenoClient().LoadModule("test-module", "/modules/test/main.ts", []string{"read", "net"}) + require.NoError(t, err) + assert.True(t, loadResp.Ok) + + // Test Go → Deno: ModuleStatus + statusResp, err := svc.DenoClient().ModuleStatus("test-module") + require.NoError(t, err) + assert.Equal(t, "test-module", statusResp.Code) + assert.Equal(t, "RUNNING", statusResp.Status) + + // Test Go → Deno: UnloadModule + unloadResp, err := svc.DenoClient().UnloadModule("test-module") + require.NoError(t, err) + assert.True(t, unloadResp.Ok) + + // Verify module is now STOPPED + statusResp2, err := svc.DenoClient().ModuleStatus("test-module") + require.NoError(t, err) + assert.Equal(t, "STOPPED", statusResp2.Status) + + // Verify CoreService gRPC still works (Deno wrote health check data) + conn, err := grpc.NewClient( + "unix://"+sockPath, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + coreClient := pb.NewCoreServiceClient(conn) + getResp, err := coreClient.StoreGet(ctx, &pb.StoreGetRequest{ + Group: "_coredeno", Key: "status", + }) + require.NoError(t, err) + assert.True(t, getResp.Found) + assert.Equal(t, "connected", getResp.Value, "Deno should have written health check") + + // Clean shutdown + err = svc.OnShutdown(context.Background()) + assert.NoError(t, err) + assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped") +} diff --git a/pkg/coredeno/lifecycle.go b/pkg/coredeno/lifecycle.go index f5975b5..1b58039 100644 --- a/pkg/coredeno/lifecycle.go +++ b/pkg/coredeno/lifecycle.go @@ -23,12 +23,17 @@ func (s *Sidecar) Start(ctx context.Context, args ...string) error { return fmt.Errorf("coredeno: mkdir %s: %w", sockDir, err) } - // Remove stale socket - os.Remove(s.opts.SocketPath) + // Remove stale Deno socket (the Core socket is managed by ListenGRPC) + if s.opts.DenoSocketPath != "" { + os.Remove(s.opts.DenoSocketPath) + } s.ctx, s.cancel = context.WithCancel(ctx) s.cmd = exec.CommandContext(s.ctx, s.opts.DenoPath, args...) - s.cmd.Env = append(os.Environ(), "CORE_SOCKET="+s.opts.SocketPath) + s.cmd.Env = append(os.Environ(), + "CORE_SOCKET="+s.opts.SocketPath, + "DENO_SOCKET="+s.opts.DenoSocketPath, + ) s.done = make(chan struct{}) if err := s.cmd.Start(); err != nil { s.cmd = nil diff --git a/pkg/coredeno/lifecycle_test.go b/pkg/coredeno/lifecycle_test.go index ef14c21..f9347fc 100644 --- a/pkg/coredeno/lifecycle_test.go +++ b/pkg/coredeno/lifecycle_test.go @@ -68,6 +68,42 @@ func TestStart_Good_EnvPassedToChild(t *testing.T) { assert.True(t, found, "child process should receive CORE_SOCKET=%s", sockPath) } +func TestStart_Good_DenoSocketEnv(t *testing.T) { + sockDir := t.TempDir() + sockPath := filepath.Join(sockDir, "core.sock") + denoSockPath := filepath.Join(sockDir, "deno.sock") + + sc := NewSidecar(Options{ + DenoPath: "sleep", + SocketPath: sockPath, + DenoSocketPath: denoSockPath, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err := sc.Start(ctx, "10") + require.NoError(t, err) + defer sc.Stop() + + sc.mu.RLock() + env := sc.cmd.Env + sc.mu.RUnlock() + + foundCore := false + foundDeno := false + for _, e := range env { + if e == "CORE_SOCKET="+sockPath { + foundCore = true + } + if e == "DENO_SOCKET="+denoSockPath { + foundDeno = true + } + } + assert.True(t, foundCore, "child should receive CORE_SOCKET") + assert.True(t, foundDeno, "child should receive DENO_SOCKET") +} + func TestSocketDirCreated_Good(t *testing.T) { dir := t.TempDir() sockPath := filepath.Join(dir, "sub", "deno.sock") diff --git a/pkg/coredeno/runtime/client.ts b/pkg/coredeno/runtime/client.ts new file mode 100644 index 0000000..8afe006 --- /dev/null +++ b/pkg/coredeno/runtime/client.ts @@ -0,0 +1,95 @@ +// CoreService gRPC client — Deno calls Go for I/O operations. +// All filesystem, store, and process operations route through this client. + +import * as grpc from "@grpc/grpc-js"; +import * as protoLoader from "@grpc/proto-loader"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const PROTO_PATH = join(__dirname, "..", "proto", "coredeno.proto"); + +let packageDef: protoLoader.PackageDefinition | null = null; + +function getProto(): any { + if (!packageDef) { + packageDef = protoLoader.loadSync(PROTO_PATH, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + }); + } + return grpc.loadPackageDefinition(packageDef).coredeno as any; +} + +export interface CoreClient { + raw: any; + storeGet(group: string, key: string): Promise<{ value: string; found: boolean }>; + storeSet(group: string, key: string, value: string): Promise<{ ok: boolean }>; + fileRead(path: string, moduleCode: string): Promise<{ content: string }>; + fileWrite(path: string, content: string, moduleCode: string): Promise<{ ok: boolean }>; + fileList(path: string, moduleCode: string): Promise<{ entries: Array<{ name: string; is_dir: boolean; size: number }> }>; + fileDelete(path: string, moduleCode: string): Promise<{ ok: boolean }>; + processStart(command: string, args: string[], moduleCode: string): Promise<{ process_id: string }>; + processStop(processId: string): Promise<{ ok: boolean }>; + close(): void; +} + +function promisify(client: any, method: string, request: any): Promise { + return new Promise((resolve, reject) => { + client[method](request, (err: Error | null, response: T) => { + if (err) reject(err); + else resolve(response); + }); + }); +} + +export function createCoreClient(socketPath: string): CoreClient { + const proto = getProto(); + const client = new proto.CoreService( + `unix:${socketPath}`, + grpc.credentials.createInsecure(), + ); + + return { + raw: client, + + storeGet(group: string, key: string) { + return promisify(client, "StoreGet", { group, key }); + }, + + storeSet(group: string, key: string, value: string) { + return promisify(client, "StoreSet", { group, key, value }); + }, + + fileRead(path: string, moduleCode: string) { + return promisify(client, "FileRead", { path, module_code: moduleCode }); + }, + + fileWrite(path: string, content: string, moduleCode: string) { + return promisify(client, "FileWrite", { path, content, module_code: moduleCode }); + }, + + fileList(path: string, moduleCode: string) { + return promisify(client, "FileList", { path, module_code: moduleCode }); + }, + + fileDelete(path: string, moduleCode: string) { + return promisify(client, "FileDelete", { path, module_code: moduleCode }); + }, + + processStart(command: string, args: string[], moduleCode: string) { + return promisify(client, "ProcessStart", { command, args, module_code: moduleCode }); + }, + + processStop(processId: string) { + return promisify(client, "ProcessStop", { process_id: processId }); + }, + + close() { + client.close(); + }, + }; +} diff --git a/pkg/coredeno/runtime/deno.json b/pkg/coredeno/runtime/deno.json new file mode 100644 index 0000000..13f798e --- /dev/null +++ b/pkg/coredeno/runtime/deno.json @@ -0,0 +1,7 @@ +{ + "imports": { + "@grpc/grpc-js": "npm:@grpc/grpc-js@^1.12", + "@grpc/proto-loader": "npm:@grpc/proto-loader@^0.7" + }, + "nodeModulesDir": "none" +} diff --git a/pkg/coredeno/runtime/deno.lock b/pkg/coredeno/runtime/deno.lock new file mode 100644 index 0000000..b02e151 --- /dev/null +++ b/pkg/coredeno/runtime/deno.lock @@ -0,0 +1,193 @@ +{ + "version": "5", + "specifiers": { + "npm:@grpc/grpc-js@^1.12.0": "1.14.3", + "npm:@grpc/proto-loader@0.7": "0.7.15" + }, + "npm": { + "@grpc/grpc-js@1.14.3": { + "integrity": "sha512-Iq8QQQ/7X3Sac15oB6p0FmUg/klxQvXLeileoqrTRGJYLV+/9tubbr9ipz0GKHjmXVsgFPo/+W+2cA8eNcR+XA==", + "dependencies": [ + "@grpc/proto-loader@0.8.0", + "@js-sdsl/ordered-map" + ] + }, + "@grpc/proto-loader@0.7.15": { + "integrity": "sha512-tMXdRCfYVixjuFK+Hk0Q1s38gV9zDiDJfWL3h1rv4Qc39oILCu1TRTDt7+fGUI8K4G1Fj125Hx/ru3azECWTyQ==", + "dependencies": [ + "lodash.camelcase", + "long", + "protobufjs", + "yargs" + ], + "bin": true + }, + "@grpc/proto-loader@0.8.0": { + "integrity": "sha512-rc1hOQtjIWGxcxpb9aHAfLpIctjEnsDehj0DAiVfBlmT84uvR0uUtN2hEi/ecvWVjXUGf5qPF4qEgiLOx1YIMQ==", + "dependencies": [ + "lodash.camelcase", + "long", + "protobufjs", + "yargs" + ], + "bin": true + }, + "@js-sdsl/ordered-map@4.4.2": { + "integrity": "sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw==" + }, + "@protobufjs/aspromise@1.1.2": { + "integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==" + }, + "@protobufjs/base64@1.1.2": { + "integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==" + }, + "@protobufjs/codegen@2.0.4": { + "integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==" + }, + "@protobufjs/eventemitter@1.1.0": { + "integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==" + }, + "@protobufjs/fetch@1.1.0": { + "integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==", + "dependencies": [ + "@protobufjs/aspromise", + "@protobufjs/inquire" + ] + }, + "@protobufjs/float@1.0.2": { + "integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==" + }, + "@protobufjs/inquire@1.1.0": { + "integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==" + }, + "@protobufjs/path@1.1.2": { + "integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==" + }, + "@protobufjs/pool@1.1.0": { + "integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==" + }, + "@protobufjs/utf8@1.1.0": { + "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==" + }, + "@types/node@25.2.3": { + "integrity": "sha512-m0jEgYlYz+mDJZ2+F4v8D1AyQb+QzsNqRuI7xg1VQX/KlKS0qT9r1Mo16yo5F/MtifXFgaofIFsdFMox2SxIbQ==", + "dependencies": [ + "undici-types" + ] + }, + "ansi-regex@5.0.1": { + "integrity": "sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==" + }, + "ansi-styles@4.3.0": { + "integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==", + "dependencies": [ + "color-convert" + ] + }, + "cliui@8.0.1": { + "integrity": "sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==", + "dependencies": [ + "string-width", + "strip-ansi", + "wrap-ansi" + ] + }, + "color-convert@2.0.1": { + "integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==", + "dependencies": [ + "color-name" + ] + }, + "color-name@1.1.4": { + "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==" + }, + "emoji-regex@8.0.0": { + "integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A==" + }, + "escalade@3.2.0": { + "integrity": "sha512-WUj2qlxaQtO4g6Pq5c29GTcWGDyd8itL8zTlipgECz3JesAiiOKotd8JU6otB3PACgG6xkJUyVhboMS+bje/jA==" + }, + "get-caller-file@2.0.5": { + "integrity": "sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg==" + }, + "is-fullwidth-code-point@3.0.0": { + "integrity": "sha512-zymm5+u+sCsSWyD9qNaejV3DFvhCKclKdizYaJUuHA83RLjb7nSuGnddCHGv0hk+KY7BMAlsWeK4Ueg6EV6XQg==" + }, + "lodash.camelcase@4.3.0": { + "integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==" + }, + "long@5.3.2": { + "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==" + }, + "protobufjs@7.5.4": { + "integrity": "sha512-CvexbZtbov6jW2eXAvLukXjXUW1TzFaivC46BpWc/3BpcCysb5Vffu+B3XHMm8lVEuy2Mm4XGex8hBSg1yapPg==", + "dependencies": [ + "@protobufjs/aspromise", + "@protobufjs/base64", + "@protobufjs/codegen", + "@protobufjs/eventemitter", + "@protobufjs/fetch", + "@protobufjs/float", + "@protobufjs/inquire", + "@protobufjs/path", + "@protobufjs/pool", + "@protobufjs/utf8", + "@types/node", + "long" + ], + "scripts": true + }, + "require-directory@2.1.1": { + "integrity": "sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q==" + }, + "string-width@4.2.3": { + "integrity": "sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==", + "dependencies": [ + "emoji-regex", + "is-fullwidth-code-point", + "strip-ansi" + ] + }, + "strip-ansi@6.0.1": { + "integrity": "sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==", + "dependencies": [ + "ansi-regex" + ] + }, + "undici-types@7.16.0": { + "integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==" + }, + "wrap-ansi@7.0.0": { + "integrity": "sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==", + "dependencies": [ + "ansi-styles", + "string-width", + "strip-ansi" + ] + }, + "y18n@5.0.8": { + "integrity": "sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==" + }, + "yargs-parser@21.1.1": { + "integrity": "sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw==" + }, + "yargs@17.7.2": { + "integrity": "sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==", + "dependencies": [ + "cliui", + "escalade", + "get-caller-file", + "require-directory", + "string-width", + "y18n", + "yargs-parser" + ] + } + }, + "workspace": { + "dependencies": [ + "npm:@grpc/grpc-js@^1.12.0", + "npm:@grpc/proto-loader@0.7" + ] + } +} diff --git a/pkg/coredeno/runtime/main.ts b/pkg/coredeno/runtime/main.ts index c1c6f93..ca0aba4 100644 --- a/pkg/coredeno/runtime/main.ts +++ b/pkg/coredeno/runtime/main.ts @@ -2,19 +2,90 @@ // Connects to CoreGO via gRPC over Unix socket. // Implements DenoService for module lifecycle management. -const socketPath = Deno.env.get("CORE_SOCKET"); -if (!socketPath) { +// Must be first import — patches http2 before @grpc/grpc-js loads. +import "./polyfill.ts"; + +import { createCoreClient, type CoreClient } from "./client.ts"; +import { startDenoServer, type DenoServer } from "./server.ts"; +import { ModuleRegistry } from "./modules.ts"; + +// Read required environment variables +const coreSocket = Deno.env.get("CORE_SOCKET"); +if (!coreSocket) { console.error("FATAL: CORE_SOCKET environment variable not set"); Deno.exit(1); } -console.error(`CoreDeno: connecting to ${socketPath}`); +const denoSocket = Deno.env.get("DENO_SOCKET"); +if (!denoSocket) { + console.error("FATAL: DENO_SOCKET environment variable not set"); + Deno.exit(1); +} -// Tier 1: signal readiness and stay alive. -// Tier 2 adds the gRPC client and DenoService implementation. +console.error(`CoreDeno: CORE_SOCKET=${coreSocket}`); +console.error(`CoreDeno: DENO_SOCKET=${denoSocket}`); + +// 1. Create module registry +const registry = new ModuleRegistry(); + +// 2. Start DenoService server (Go calls us here via JSON-RPC over Unix socket) +let denoServer: DenoServer; +try { + denoServer = await startDenoServer(denoSocket, registry); + console.error("CoreDeno: DenoService server started"); +} catch (err) { + console.error(`FATAL: failed to start DenoService server: ${err}`); + Deno.exit(1); +} + +// 3. Connect to CoreService (we call Go here) with retry +let coreClient: CoreClient; +{ + coreClient = createCoreClient(coreSocket); + const maxRetries = 20; + let connected = false; + let lastErr: unknown; + for (let i = 0; i < maxRetries; i++) { + try { + const timeoutCall = (p: Promise): Promise => + Promise.race([ + p, + new Promise((_, reject) => + setTimeout(() => reject(new Error("call timeout")), 2000), + ), + ]); + await timeoutCall( + coreClient.storeSet("_coredeno", "status", "connected"), + ); + const resp = await timeoutCall( + coreClient.storeGet("_coredeno", "status"), + ); + if (resp.found && resp.value === "connected") { + connected = true; + break; + } + } catch (err) { + lastErr = err; + if (i < 3 || i === 9 || i === 19) { + console.error(`CoreDeno: retry ${i}: ${err}`); + } + } + await new Promise((r) => setTimeout(r, 250)); + } + if (!connected) { + console.error( + `FATAL: failed to connect to CoreService after retries, last error: ${lastErr}`, + ); + denoServer.close(); + Deno.exit(1); + } + console.error("CoreDeno: CoreService client connected"); +} + +// 4. Signal readiness console.error("CoreDeno: ready"); -// Keep alive until parent sends SIGTERM +// 5. Keep alive until SIGTERM const ac = new AbortController(); Deno.addSignalListener("SIGTERM", () => { console.error("CoreDeno: shutting down"); @@ -26,5 +97,7 @@ try { ac.signal.addEventListener("abort", () => reject(new Error("shutdown"))); }); } catch { - // Clean exit on SIGTERM + // Clean shutdown + coreClient.close(); + denoServer.close(); } diff --git a/pkg/coredeno/runtime/modules.ts b/pkg/coredeno/runtime/modules.ts new file mode 100644 index 0000000..a53bd4b --- /dev/null +++ b/pkg/coredeno/runtime/modules.ts @@ -0,0 +1,50 @@ +// Module registry — tracks loaded modules and their lifecycle status. +// Tier 2: status tracking only. Tier 3 adds real Deno worker isolates. + +export type ModuleStatus = "UNKNOWN" | "LOADING" | "RUNNING" | "STOPPED" | "ERRORED"; + +// Status enum values matching the proto definition. +export const StatusEnum: Record = { + UNKNOWN: 0, + LOADING: 1, + RUNNING: 2, + STOPPED: 3, + ERRORED: 4, +}; + +export interface Module { + code: string; + entryPoint: string; + permissions: string[]; + status: ModuleStatus; +} + +export class ModuleRegistry { + private modules = new Map(); + + load(code: string, entryPoint: string, permissions: string[]): void { + this.modules.set(code, { + code, + entryPoint, + permissions, + status: "RUNNING", + }); + console.error(`CoreDeno: module loaded: ${code}`); + } + + unload(code: string): boolean { + const mod = this.modules.get(code); + if (!mod) return false; + mod.status = "STOPPED"; + console.error(`CoreDeno: module unloaded: ${code}`); + return true; + } + + status(code: string): ModuleStatus { + return this.modules.get(code)?.status ?? "UNKNOWN"; + } + + list(): Module[] { + return Array.from(this.modules.values()); + } +} diff --git a/pkg/coredeno/runtime/polyfill.ts b/pkg/coredeno/runtime/polyfill.ts new file mode 100644 index 0000000..a3ef4f9 --- /dev/null +++ b/pkg/coredeno/runtime/polyfill.ts @@ -0,0 +1,94 @@ +// Deno http2 + grpc-js polyfill — must be imported BEFORE @grpc/grpc-js. +// +// Two issues with Deno 2.x node compat: +// 1. http2.getDefaultSettings throws "Not implemented" +// 2. grpc-js's createConnection returns a socket that reports readyState="open" +// but never emits "connect", causing http2 sessions to hang forever. +// Fix: wrap createConnection to emit "connect" on next tick for open sockets. + +import http2 from "node:http2"; + +// Fix 1: getDefaultSettings stub +(http2 as any).getDefaultSettings = () => ({ + headerTableSize: 4096, + enablePush: true, + initialWindowSize: 65535, + maxFrameSize: 16384, + maxConcurrentStreams: 0xffffffff, + maxHeaderListSize: 65535, + maxHeaderSize: 65535, + enableConnectProtocol: false, +}); + +// Fix 2: grpc-js (transport.js line 536) passes an already-connected socket +// to http2.connect via createConnection. Deno's http2 never completes the +// HTTP/2 handshake because it expects a "connect" event from the socket, +// which already fired. Emitting "connect" again causes "Busy: Unix socket +// is currently in use" in Deno's internal http2. +// +// Workaround: track Unix socket paths via net.connect intercept, then in +// createConnection, return a FRESH socket. Keep the original socket alive +// (grpc-js has close listeners on it) but unused for data. +import net from "node:net"; + +const socketPathMap = new WeakMap(); +const origNetConnect = net.connect; +(net as any).connect = function (...args: any[]) { + const sock = origNetConnect.apply(this, args as any); + if (args[0] && typeof args[0] === "object" && args[0].path) { + socketPathMap.set(sock, args[0].path); + } + return sock; +}; + +// Fix 3: Deno's http2 client never fires "remoteSettings" event, which +// grpc-js waits for before marking the transport as READY. +// Workaround: emit "remoteSettings" after "connect" with reasonable defaults. +const origConnect = http2.connect; +(http2 as any).connect = function ( + authority: any, + options: any, + ...rest: any[] +) { + // For Unix sockets: replace pre-connected socket with fresh one + if (options?.createConnection) { + const origCC = options.createConnection; + options = { + ...options, + createConnection(...ccArgs: any[]) { + const origSock = origCC.apply(this, ccArgs); + const unixPath = socketPathMap.get(origSock); + if ( + unixPath && + !origSock.connecting && + origSock.readyState === "open" + ) { + const freshSock = net.connect({ path: unixPath }); + freshSock.on("close", () => origSock.destroy()); + return freshSock; + } + return origSock; + }, + }; + } + + const session = origConnect.call(this, authority, options, ...rest); + + // Emit remoteSettings after connect — Deno's http2 doesn't emit it + session.once("connect", () => { + if (!session.destroyed && !session.closed) { + const settings = { + headerTableSize: 4096, + enablePush: false, + initialWindowSize: 65535, + maxFrameSize: 16384, + maxConcurrentStreams: 100, + maxHeaderListSize: 8192, + maxHeaderSize: 8192, + }; + process.nextTick(() => session.emit("remoteSettings", settings)); + } + }); + + return session; +}; diff --git a/pkg/coredeno/runtime/server.ts b/pkg/coredeno/runtime/server.ts new file mode 100644 index 0000000..81065a2 --- /dev/null +++ b/pkg/coredeno/runtime/server.ts @@ -0,0 +1,124 @@ +// DenoService JSON-RPC server — Go calls Deno for module lifecycle management. +// Uses length-prefixed JSON over raw Unix socket (Deno's http2 server is broken). +// Protocol: 4-byte big-endian length + JSON payload, newline-delimited. + +import { ModuleRegistry } from "./modules.ts"; + +export interface DenoServer { + close(): void; +} + +export async function startDenoServer( + socketPath: string, + registry: ModuleRegistry, +): Promise { + // Remove stale socket + try { + Deno.removeSync(socketPath); + } catch { + // ignore + } + + const listener = Deno.listen({ transport: "unix", path: socketPath }); + + const handleConnection = async (conn: Deno.UnixConn) => { + const reader = conn.readable.getReader(); + const writer = conn.writable.getWriter(); + const decoder = new TextDecoder(); + let buffer = ""; + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + // Process complete lines (newline-delimited JSON) + let newlineIdx: number; + while ((newlineIdx = buffer.indexOf("\n")) !== -1) { + const line = buffer.slice(0, newlineIdx); + buffer = buffer.slice(newlineIdx + 1); + + if (!line.trim()) continue; + + try { + const req = JSON.parse(line); + const resp = dispatch(req, registry); + await writer.write( + new TextEncoder().encode(JSON.stringify(resp) + "\n"), + ); + } catch (err) { + const errResp = { + error: err instanceof Error ? err.message : String(err), + }; + await writer.write( + new TextEncoder().encode(JSON.stringify(errResp) + "\n"), + ); + } + } + } + } catch { + // Connection closed or error — expected during shutdown + } finally { + try { + writer.close(); + } catch { + /* already closed */ + } + } + }; + + // Accept connections in background + const abortController = new AbortController(); + (async () => { + try { + for await (const conn of listener) { + if (abortController.signal.aborted) break; + handleConnection(conn); + } + } catch { + // Listener closed + } + })(); + + return { + close() { + abortController.abort(); + listener.close(); + }, + }; +} + +interface RPCRequest { + method: string; + code?: string; + entry_point?: string; + permissions?: string[]; + process_id?: string; +} + +function dispatch( + req: RPCRequest, + registry: ModuleRegistry, +): Record { + switch (req.method) { + case "LoadModule": { + registry.load( + req.code ?? "", + req.entry_point ?? "", + req.permissions ?? [], + ); + return { ok: true, error: "" }; + } + case "UnloadModule": { + const ok = registry.unload(req.code ?? ""); + return { ok }; + } + case "ModuleStatus": { + return { code: req.code, status: registry.status(req.code ?? "") }; + } + default: + return { error: `unknown method: ${req.method}` }; + } +} diff --git a/pkg/coredeno/server.go b/pkg/coredeno/server.go index 31395a1..df040bf 100644 --- a/pkg/coredeno/server.go +++ b/pkg/coredeno/server.go @@ -8,8 +8,27 @@ import ( "forge.lthn.ai/core/go/pkg/io" "forge.lthn.ai/core/go/pkg/manifest" "forge.lthn.ai/core/go/pkg/store" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +// ProcessRunner abstracts process management for the gRPC server. +// Satisfied by *process.Service. +type ProcessRunner interface { + Start(ctx context.Context, command string, args ...string) (ProcessHandle, error) + Kill(id string) error +} + +// ProcessHandle is returned by ProcessRunner.Start. +type ProcessHandle interface { + Info() ProcessInfo +} + +// ProcessInfo is the subset of process info the server needs. +type ProcessInfo struct { + ID string +} + // Server implements the CoreService gRPC interface with permission gating. // Every I/O request is checked against the calling module's declared permissions. type Server struct { @@ -17,6 +36,7 @@ type Server struct { medium io.Medium store *store.Store manifests map[string]*manifest.Manifest + processes ProcessRunner } // NewServer creates a CoreService server backed by the given Medium and Store. @@ -129,3 +149,38 @@ func (s *Server) StoreSet(_ context.Context, req *pb.StoreSetRequest) (*pb.Store } return &pb.StoreSetResponse{Ok: true}, nil } + +// SetProcessRunner sets the process runner for ProcessStart/ProcessStop. +func (s *Server) SetProcessRunner(pr ProcessRunner) { + s.processes = pr +} + +// ProcessStart implements CoreService.ProcessStart with permission gating. +func (s *Server) ProcessStart(ctx context.Context, req *pb.ProcessStartRequest) (*pb.ProcessStartResponse, error) { + if s.processes == nil { + return nil, status.Error(codes.Unimplemented, "process service not available") + } + m, err := s.getManifest(req.ModuleCode) + if err != nil { + return nil, err + } + if !CheckRun(req.Command, m.Permissions.Run) { + return nil, fmt.Errorf("permission denied: %s cannot run %s", req.ModuleCode, req.Command) + } + proc, err := s.processes.Start(ctx, req.Command, req.Args...) + if err != nil { + return nil, fmt.Errorf("process start: %w", err) + } + return &pb.ProcessStartResponse{ProcessId: proc.Info().ID}, nil +} + +// ProcessStop implements CoreService.ProcessStop. +func (s *Server) ProcessStop(_ context.Context, req *pb.ProcessStopRequest) (*pb.ProcessStopResponse, error) { + if s.processes == nil { + return nil, status.Error(codes.Unimplemented, "process service not available") + } + if err := s.processes.Kill(req.ProcessId); err != nil { + return nil, fmt.Errorf("process stop: %w", err) + } + return &pb.ProcessStopResponse{Ok: true}, nil +} diff --git a/pkg/coredeno/server_test.go b/pkg/coredeno/server_test.go index 6438f33..276e064 100644 --- a/pkg/coredeno/server_test.go +++ b/pkg/coredeno/server_test.go @@ -2,16 +2,48 @@ package coredeno import ( "context" + "fmt" "testing" + pb "forge.lthn.ai/core/go/pkg/coredeno/proto" "forge.lthn.ai/core/go/pkg/io" "forge.lthn.ai/core/go/pkg/manifest" - pb "forge.lthn.ai/core/go/pkg/coredeno/proto" "forge.lthn.ai/core/go/pkg/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) +// mockProcessRunner implements ProcessRunner for testing. +type mockProcessRunner struct { + started map[string]bool + nextID int +} + +func newMockProcessRunner() *mockProcessRunner { + return &mockProcessRunner{started: make(map[string]bool)} +} + +func (m *mockProcessRunner) Start(_ context.Context, command string, args ...string) (ProcessHandle, error) { + m.nextID++ + id := fmt.Sprintf("proc-%d", m.nextID) + m.started[id] = true + return &mockProcessHandle{id: id}, nil +} + +func (m *mockProcessRunner) Kill(id string) error { + if !m.started[id] { + return fmt.Errorf("process not found: %s", id) + } + delete(m.started, id) + return nil +} + +type mockProcessHandle struct{ id string } + +func (h *mockProcessHandle) Info() ProcessInfo { return ProcessInfo{ID: h.id} } + func newTestServer(t *testing.T) *Server { t.Helper() medium := io.NewMockMedium() @@ -95,3 +127,74 @@ func TestStoreGet_Good_NotFound(t *testing.T) { require.NoError(t, err) assert.False(t, resp.Found) } + +func newTestServerWithProcess(t *testing.T) (*Server, *mockProcessRunner) { + t.Helper() + srv := newTestServer(t) + srv.RegisterModule(&manifest.Manifest{ + Code: "runner-mod", + Permissions: manifest.Permissions{ + Run: []string{"echo", "ls"}, + }, + }) + pr := newMockProcessRunner() + srv.SetProcessRunner(pr) + return srv, pr +} + +func TestProcessStart_Good(t *testing.T) { + srv, _ := newTestServerWithProcess(t) + resp, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{ + Command: "echo", Args: []string{"hello"}, ModuleCode: "runner-mod", + }) + require.NoError(t, err) + assert.NotEmpty(t, resp.ProcessId) +} + +func TestProcessStart_Bad_PermissionDenied(t *testing.T) { + srv, _ := newTestServerWithProcess(t) + _, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{ + Command: "rm", Args: []string{"-rf", "/"}, ModuleCode: "runner-mod", + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "permission denied") +} + +func TestProcessStart_Bad_NoProcessService(t *testing.T) { + srv := newTestServer(t) + srv.RegisterModule(&manifest.Manifest{ + Code: "no-proc-mod", + Permissions: manifest.Permissions{Run: []string{"echo"}}, + }) + _, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{ + Command: "echo", ModuleCode: "no-proc-mod", + }) + assert.Error(t, err) + st, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, codes.Unimplemented, st.Code()) +} + +func TestProcessStop_Good(t *testing.T) { + srv, _ := newTestServerWithProcess(t) + // Start a process first + startResp, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{ + Command: "echo", ModuleCode: "runner-mod", + }) + require.NoError(t, err) + + // Stop it + resp, err := srv.ProcessStop(context.Background(), &pb.ProcessStopRequest{ + ProcessId: startResp.ProcessId, + }) + require.NoError(t, err) + assert.True(t, resp.Ok) +} + +func TestProcessStop_Bad_NotFound(t *testing.T) { + srv, _ := newTestServerWithProcess(t) + _, err := srv.ProcessStop(context.Background(), &pb.ProcessStopRequest{ + ProcessId: "nonexistent", + }) + assert.Error(t, err) +} diff --git a/pkg/coredeno/service.go b/pkg/coredeno/service.go index 9bc4d85..fe6bd70 100644 --- a/pkg/coredeno/service.go +++ b/pkg/coredeno/service.go @@ -3,6 +3,8 @@ package coredeno import ( "context" "fmt" + "os" + "time" core "forge.lthn.ai/core/go/pkg/framework/core" "forge.lthn.ai/core/go/pkg/io" @@ -23,6 +25,7 @@ type Service struct { store *store.Store grpcCancel context.CancelFunc grpcDone chan error + denoClient *DenoClient } // NewServiceFactory returns a factory function for framework registration via WithService. @@ -91,9 +94,26 @@ func (s *Service) OnStartup(ctx context.Context) error { // 6. Start sidecar (if args provided) if len(opts.SidecarArgs) > 0 { + // Wait for core socket so sidecar can connect to our gRPC server + if err := waitForSocket(ctx, opts.SocketPath, 5*time.Second); err != nil { + return fmt.Errorf("coredeno: core socket: %w", err) + } + if err := s.sidecar.Start(ctx, opts.SidecarArgs...); err != nil { return fmt.Errorf("coredeno: sidecar: %w", err) } + + // 7. Wait for Deno's server and connect as client + if opts.DenoSocketPath != "" { + if err := waitForSocket(ctx, opts.DenoSocketPath, 10*time.Second); err != nil { + return fmt.Errorf("coredeno: deno socket: %w", err) + } + dc, err := DialDeno(opts.DenoSocketPath) + if err != nil { + return fmt.Errorf("coredeno: deno client: %w", err) + } + s.denoClient = dc + } } return nil @@ -101,7 +121,12 @@ func (s *Service) OnStartup(ctx context.Context) error { // OnShutdown stops the CoreDeno subsystem. Called by the framework on app shutdown. func (s *Service) OnShutdown(_ context.Context) error { - // Stop sidecar first + // Close Deno client connection + if s.denoClient != nil { + s.denoClient.Close() + } + + // Stop sidecar _ = s.sidecar.Stop() // Stop gRPC listener @@ -127,3 +152,27 @@ func (s *Service) Sidecar() *Sidecar { func (s *Service) GRPCServer() *Server { return s.grpcServer } + +// DenoClient returns the DenoService client for calling the Deno sidecar. +// Returns nil if the sidecar was not started or has no DenoSocketPath. +func (s *Service) DenoClient() *DenoClient { + return s.denoClient +} + +// waitForSocket polls until a Unix socket file appears or the context/timeout expires. +func waitForSocket(ctx context.Context, path string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for { + if _, err := os.Stat(path); err == nil { + return nil + } + if time.Now().After(deadline) { + return fmt.Errorf("timeout waiting for socket %s", path) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(50 * time.Millisecond): + } + } +}