Compare commits
4 commits
7d047fbdcc
...
9899398153
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9899398153 | ||
|
|
ad6a466459 | ||
|
|
af98accc03 | ||
|
|
2f246ad053 |
29 changed files with 2794 additions and 32 deletions
4
go.mod
4
go.mod
|
|
@ -28,6 +28,8 @@ require (
|
|||
golang.org/x/net v0.50.0
|
||||
golang.org/x/term v0.40.0
|
||||
golang.org/x/text v0.34.0
|
||||
google.golang.org/grpc v1.79.1
|
||||
google.golang.org/protobuf v1.36.11
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
modernc.org/sqlite v1.45.0
|
||||
)
|
||||
|
|
@ -113,8 +115,6 @@ require (
|
|||
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
|
||||
gonum.org/v1/gonum v0.17.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
|
||||
google.golang.org/grpc v1.79.1 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
modernc.org/libc v1.67.7 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.11.0 // indirect
|
||||
|
|
|
|||
21
go.sum
21
go.sum
|
|
@ -52,6 +52,8 @@ github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo
|
|||
github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
|
||||
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
|
||||
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8=
|
||||
github.com/cloudflare/circl v1.6.3/go.mod h1:2eXP6Qfat4O/Yhh8BznvKnJ+uzEoTQ6jVKJRn81BiS4=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
|
||||
|
|
@ -245,21 +247,16 @@ github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
|
|||
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
|
||||
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
|
||||
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
|
||||
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
|
||||
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
|
||||
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
|
||||
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg=
|
||||
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA=
|
||||
go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8=
|
||||
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
|
||||
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
|
||||
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
|
||||
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
|
||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
|
|
@ -302,12 +299,8 @@ golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhS
|
|||
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
|
||||
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
|
||||
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
|
||||
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
|
||||
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
|
||||
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package coredeno
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
|
|
@ -12,8 +13,13 @@ import (
|
|||
|
||||
// Options configures the CoreDeno sidecar.
|
||||
type Options struct {
|
||||
DenoPath string // path to deno binary (default: "deno")
|
||||
SocketPath string // Unix socket path for gRPC
|
||||
DenoPath string // path to deno binary (default: "deno")
|
||||
SocketPath string // Unix socket path for Go's gRPC server (CoreService)
|
||||
DenoSocketPath string // Unix socket path for Deno's gRPC server (DenoService)
|
||||
AppRoot string // app root directory (sandboxed I/O)
|
||||
StoreDBPath string // SQLite DB path (default: AppRoot/.core/store.db)
|
||||
PublicKey ed25519.PublicKey // ed25519 public key for manifest verification (optional)
|
||||
SidecarArgs []string // args passed to the sidecar process
|
||||
}
|
||||
|
||||
// Permissions declares per-module Deno permission flags.
|
||||
|
|
@ -69,5 +75,11 @@ func NewSidecar(opts Options) *Sidecar {
|
|||
if opts.SocketPath == "" {
|
||||
opts.SocketPath = DefaultSocketPath()
|
||||
}
|
||||
if opts.DenoSocketPath == "" && opts.SocketPath != "" {
|
||||
opts.DenoSocketPath = filepath.Join(filepath.Dir(opts.SocketPath), "deno.sock")
|
||||
}
|
||||
if opts.StoreDBPath == "" && opts.AppRoot != "" {
|
||||
opts.StoreDBPath = filepath.Join(opts.AppRoot, ".core", "store.db")
|
||||
}
|
||||
return &Sidecar{opts: opts}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,34 @@ func TestSidecar_PermissionFlags_Empty(t *testing.T) {
|
|||
assert.Empty(t, flags)
|
||||
}
|
||||
|
||||
func TestOptions_AppRoot_Good(t *testing.T) {
|
||||
opts := Options{
|
||||
DenoPath: "deno",
|
||||
SocketPath: "/tmp/test.sock",
|
||||
AppRoot: "/app",
|
||||
StoreDBPath: "/app/.core/store.db",
|
||||
}
|
||||
sc := NewSidecar(opts)
|
||||
assert.Equal(t, "/app", sc.opts.AppRoot)
|
||||
assert.Equal(t, "/app/.core/store.db", sc.opts.StoreDBPath)
|
||||
}
|
||||
|
||||
func TestOptions_StoreDBPath_Default_Good(t *testing.T) {
|
||||
opts := Options{AppRoot: "/app"}
|
||||
sc := NewSidecar(opts)
|
||||
assert.Equal(t, "/app/.core/store.db", sc.opts.StoreDBPath,
|
||||
"StoreDBPath should default to AppRoot/.core/store.db")
|
||||
}
|
||||
|
||||
func TestOptions_SidecarArgs_Good(t *testing.T) {
|
||||
opts := Options{
|
||||
DenoPath: "deno",
|
||||
SidecarArgs: []string{"run", "--allow-env", "main.ts"},
|
||||
}
|
||||
sc := NewSidecar(opts)
|
||||
assert.Equal(t, []string{"run", "--allow-env", "main.ts"}, sc.opts.SidecarArgs)
|
||||
}
|
||||
|
||||
func TestDefaultSocketPath_XDG(t *testing.T) {
|
||||
orig := os.Getenv("XDG_RUNTIME_DIR")
|
||||
defer os.Setenv("XDG_RUNTIME_DIR", orig)
|
||||
|
|
@ -52,3 +80,20 @@ func TestDefaultSocketPath_XDG(t *testing.T) {
|
|||
path := DefaultSocketPath()
|
||||
assert.Equal(t, "/run/user/1000/core/deno.sock", path)
|
||||
}
|
||||
|
||||
func TestOptions_DenoSocketPath_Default_Good(t *testing.T) {
|
||||
opts := Options{SocketPath: "/tmp/core/core.sock"}
|
||||
sc := NewSidecar(opts)
|
||||
assert.Equal(t, "/tmp/core/deno.sock", sc.opts.DenoSocketPath,
|
||||
"DenoSocketPath should default to same dir as SocketPath with deno.sock")
|
||||
}
|
||||
|
||||
func TestOptions_DenoSocketPath_Explicit_Good(t *testing.T) {
|
||||
opts := Options{
|
||||
SocketPath: "/tmp/core/core.sock",
|
||||
DenoSocketPath: "/tmp/custom/deno.sock",
|
||||
}
|
||||
sc := NewSidecar(opts)
|
||||
assert.Equal(t, "/tmp/custom/deno.sock", sc.opts.DenoSocketPath,
|
||||
"Explicit DenoSocketPath should not be overridden")
|
||||
}
|
||||
|
|
|
|||
135
pkg/coredeno/denoclient.go
Normal file
135
pkg/coredeno/denoclient.go
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
package coredeno
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// DenoClient communicates with the Deno sidecar's JSON-RPC server over a Unix socket.
|
||||
// Thread-safe: uses a mutex to serialize requests (one connection, request/response protocol).
|
||||
type DenoClient struct {
|
||||
mu sync.Mutex
|
||||
conn net.Conn
|
||||
reader *bufio.Reader
|
||||
}
|
||||
|
||||
// DialDeno connects to the Deno JSON-RPC server on the given Unix socket path.
|
||||
func DialDeno(socketPath string) (*DenoClient, error) {
|
||||
conn, err := net.Dial("unix", socketPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("deno dial: %w", err)
|
||||
}
|
||||
return &DenoClient{
|
||||
conn: conn,
|
||||
reader: bufio.NewReader(conn),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying connection.
|
||||
func (c *DenoClient) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *DenoClient) call(req map[string]any) (map[string]any, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
data, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal: %w", err)
|
||||
}
|
||||
data = append(data, '\n')
|
||||
|
||||
if _, err := c.conn.Write(data); err != nil {
|
||||
return nil, fmt.Errorf("write: %w", err)
|
||||
}
|
||||
|
||||
line, err := c.reader.ReadBytes('\n')
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read: %w", err)
|
||||
}
|
||||
|
||||
var resp map[string]any
|
||||
if err := json.Unmarshal(line, &resp); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal: %w", err)
|
||||
}
|
||||
|
||||
if errMsg, ok := resp["error"].(string); ok && errMsg != "" {
|
||||
return nil, fmt.Errorf("deno: %s", errMsg)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// ModulePermissions declares per-module permission scopes for Deno Worker sandboxing.
|
||||
type ModulePermissions struct {
|
||||
Read []string `json:"read,omitempty"`
|
||||
Write []string `json:"write,omitempty"`
|
||||
Net []string `json:"net,omitempty"`
|
||||
Run []string `json:"run,omitempty"`
|
||||
}
|
||||
|
||||
// LoadModuleResponse holds the result of a LoadModule call.
|
||||
type LoadModuleResponse struct {
|
||||
Ok bool
|
||||
Error string
|
||||
}
|
||||
|
||||
// LoadModule tells Deno to load a module with the given permissions.
|
||||
func (c *DenoClient) LoadModule(code, entryPoint string, perms ModulePermissions) (*LoadModuleResponse, error) {
|
||||
resp, err := c.call(map[string]any{
|
||||
"method": "LoadModule",
|
||||
"code": code,
|
||||
"entry_point": entryPoint,
|
||||
"permissions": perms,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &LoadModuleResponse{
|
||||
Ok: resp["ok"] == true,
|
||||
Error: fmt.Sprint(resp["error"]),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// UnloadModuleResponse holds the result of an UnloadModule call.
|
||||
type UnloadModuleResponse struct {
|
||||
Ok bool
|
||||
}
|
||||
|
||||
// UnloadModule tells Deno to unload a module.
|
||||
func (c *DenoClient) UnloadModule(code string) (*UnloadModuleResponse, error) {
|
||||
resp, err := c.call(map[string]any{
|
||||
"method": "UnloadModule",
|
||||
"code": code,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &UnloadModuleResponse{
|
||||
Ok: resp["ok"] == true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ModuleStatusResponse holds the result of a ModuleStatus call.
|
||||
type ModuleStatusResponse struct {
|
||||
Code string
|
||||
Status string
|
||||
}
|
||||
|
||||
// ModuleStatus queries the status of a module in the Deno runtime.
|
||||
func (c *DenoClient) ModuleStatus(code string) (*ModuleStatusResponse, error) {
|
||||
resp, err := c.call(map[string]any{
|
||||
"method": "ModuleStatus",
|
||||
"code": code,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ModuleStatusResponse{
|
||||
Code: fmt.Sprint(resp["code"]),
|
||||
Status: fmt.Sprint(resp["status"]),
|
||||
}, nil
|
||||
}
|
||||
499
pkg/coredeno/integration_test.go
Normal file
499
pkg/coredeno/integration_test.go
Normal file
|
|
@ -0,0 +1,499 @@
|
|||
//go:build integration
|
||||
|
||||
package coredeno
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
core "forge.lthn.ai/core/go/pkg/framework/core"
|
||||
"forge.lthn.ai/core/go/pkg/marketplace"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
// unused import guard
|
||||
var _ = pb.NewCoreServiceClient
|
||||
|
||||
func findDeno(t *testing.T) string {
|
||||
t.Helper()
|
||||
denoPath, err := exec.LookPath("deno")
|
||||
if err != nil {
|
||||
home, _ := os.UserHomeDir()
|
||||
denoPath = filepath.Join(home, ".deno", "bin", "deno")
|
||||
if _, err := os.Stat(denoPath); err != nil {
|
||||
t.Skip("deno not installed")
|
||||
}
|
||||
}
|
||||
return denoPath
|
||||
}
|
||||
|
||||
// runtimeEntryPoint returns the absolute path to runtime/main.ts.
|
||||
func runtimeEntryPoint(t *testing.T) string {
|
||||
t.Helper()
|
||||
// We're in pkg/coredeno/ during test, runtime is a subdir
|
||||
abs, err := filepath.Abs("runtime/main.ts")
|
||||
require.NoError(t, err)
|
||||
require.FileExists(t, abs)
|
||||
return abs
|
||||
}
|
||||
|
||||
// testModulePath returns the absolute path to runtime/testdata/test-module.ts.
|
||||
func testModulePath(t *testing.T) string {
|
||||
t.Helper()
|
||||
abs, err := filepath.Abs("runtime/testdata/test-module.ts")
|
||||
require.NoError(t, err)
|
||||
require.FileExists(t, abs)
|
||||
return abs
|
||||
}
|
||||
|
||||
func TestIntegration_FullBoot_Good(t *testing.T) {
|
||||
denoPath := findDeno(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
|
||||
// Write a manifest
|
||||
coreDir := filepath.Join(tmpDir, ".core")
|
||||
require.NoError(t, os.MkdirAll(coreDir, 0755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(`
|
||||
code: integration-test
|
||||
name: Integration Test
|
||||
version: "1.0"
|
||||
permissions:
|
||||
read: ["./data/"]
|
||||
`), 0644))
|
||||
|
||||
entryPoint := runtimeEntryPoint(t)
|
||||
|
||||
opts := Options{
|
||||
DenoPath: denoPath,
|
||||
SocketPath: sockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
SidecarArgs: []string{"run", "-A", entryPoint},
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(opts)
|
||||
result, err := factory(c)
|
||||
require.NoError(t, err)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = svc.OnStartup(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify gRPC is working
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(sockPath)
|
||||
return err == nil
|
||||
}, 5*time.Second, 50*time.Millisecond, "socket should appear")
|
||||
|
||||
conn, err := grpc.NewClient(
|
||||
"unix://"+sockPath,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
client := pb.NewCoreServiceClient(conn)
|
||||
_, err = client.StoreSet(ctx, &pb.StoreSetRequest{
|
||||
Group: "integration", Key: "boot", Value: "ok",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.StoreGet(ctx, &pb.StoreGetRequest{
|
||||
Group: "integration", Key: "boot",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "ok", resp.Value)
|
||||
assert.True(t, resp.Found)
|
||||
|
||||
// Verify sidecar is running
|
||||
assert.True(t, svc.sidecar.IsRunning(), "Deno sidecar should be running")
|
||||
|
||||
// Clean shutdown
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
|
||||
}
|
||||
|
||||
func TestIntegration_Tier2_Bidirectional_Good(t *testing.T) {
|
||||
denoPath := findDeno(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
denoSockPath := filepath.Join(tmpDir, "deno.sock")
|
||||
|
||||
// Write a manifest
|
||||
coreDir := filepath.Join(tmpDir, ".core")
|
||||
require.NoError(t, os.MkdirAll(coreDir, 0755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(`
|
||||
code: tier2-test
|
||||
name: Tier 2 Test
|
||||
version: "1.0"
|
||||
permissions:
|
||||
read: ["./data/"]
|
||||
run: ["echo"]
|
||||
`), 0644))
|
||||
|
||||
entryPoint := runtimeEntryPoint(t)
|
||||
|
||||
opts := Options{
|
||||
DenoPath: denoPath,
|
||||
SocketPath: sockPath,
|
||||
DenoSocketPath: denoSockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
SidecarArgs: []string{"run", "-A", "--unstable-worker-options", entryPoint},
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(opts)
|
||||
result, err := factory(c)
|
||||
require.NoError(t, err)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = svc.OnStartup(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify both sockets appeared
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(sockPath)
|
||||
return err == nil
|
||||
}, 10*time.Second, 50*time.Millisecond, "core socket should appear")
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(denoSockPath)
|
||||
return err == nil
|
||||
}, 10*time.Second, 50*time.Millisecond, "deno socket should appear")
|
||||
|
||||
// Verify sidecar is running
|
||||
assert.True(t, svc.sidecar.IsRunning(), "Deno sidecar should be running")
|
||||
|
||||
// Verify DenoClient is connected
|
||||
require.NotNil(t, svc.DenoClient(), "DenoClient should be connected")
|
||||
|
||||
// Test Go → Deno: LoadModule with real Worker
|
||||
modPath := testModulePath(t)
|
||||
loadResp, err := svc.DenoClient().LoadModule("test-module", modPath, ModulePermissions{
|
||||
Read: []string{filepath.Dir(modPath) + "/"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, loadResp.Ok)
|
||||
|
||||
// Wait for module to finish loading (async Worker init)
|
||||
require.Eventually(t, func() bool {
|
||||
resp, err := svc.DenoClient().ModuleStatus("test-module")
|
||||
return err == nil && (resp.Status == "RUNNING" || resp.Status == "ERRORED")
|
||||
}, 5*time.Second, 50*time.Millisecond, "module should finish loading")
|
||||
|
||||
statusResp, err := svc.DenoClient().ModuleStatus("test-module")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "test-module", statusResp.Code)
|
||||
assert.Equal(t, "RUNNING", statusResp.Status)
|
||||
|
||||
// Test Go → Deno: UnloadModule
|
||||
unloadResp, err := svc.DenoClient().UnloadModule("test-module")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, unloadResp.Ok)
|
||||
|
||||
// Verify module is now STOPPED
|
||||
statusResp2, err := svc.DenoClient().ModuleStatus("test-module")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "STOPPED", statusResp2.Status)
|
||||
|
||||
// Verify CoreService gRPC still works (Deno wrote health check data)
|
||||
conn, err := grpc.NewClient(
|
||||
"unix://"+sockPath,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
coreClient := pb.NewCoreServiceClient(conn)
|
||||
getResp, err := coreClient.StoreGet(ctx, &pb.StoreGetRequest{
|
||||
Group: "_coredeno", Key: "status",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, getResp.Found)
|
||||
assert.Equal(t, "connected", getResp.Value, "Deno should have written health check")
|
||||
|
||||
// Clean shutdown
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
|
||||
}
|
||||
|
||||
func TestIntegration_Tier3_WorkerIsolation_Good(t *testing.T) {
|
||||
denoPath := findDeno(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
denoSockPath := filepath.Join(tmpDir, "deno.sock")
|
||||
|
||||
// Write a manifest
|
||||
coreDir := filepath.Join(tmpDir, ".core")
|
||||
require.NoError(t, os.MkdirAll(coreDir, 0755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(`
|
||||
code: tier3-test
|
||||
name: Tier 3 Test
|
||||
version: "1.0"
|
||||
permissions:
|
||||
read: ["./data/"]
|
||||
`), 0644))
|
||||
|
||||
entryPoint := runtimeEntryPoint(t)
|
||||
modPath := testModulePath(t)
|
||||
|
||||
opts := Options{
|
||||
DenoPath: denoPath,
|
||||
SocketPath: sockPath,
|
||||
DenoSocketPath: denoSockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
SidecarArgs: []string{"run", "-A", "--unstable-worker-options", entryPoint},
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(opts)
|
||||
result, err := factory(c)
|
||||
require.NoError(t, err)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = svc.OnStartup(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify both sockets appeared
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(denoSockPath)
|
||||
return err == nil
|
||||
}, 10*time.Second, 50*time.Millisecond, "deno socket should appear")
|
||||
|
||||
require.NotNil(t, svc.DenoClient(), "DenoClient should be connected")
|
||||
|
||||
// Load a real module — it writes to store via I/O bridge
|
||||
loadResp, err := svc.DenoClient().LoadModule("test-mod", modPath, ModulePermissions{
|
||||
Read: []string{filepath.Dir(modPath) + "/"},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, loadResp.Ok)
|
||||
|
||||
// Wait for module to reach RUNNING (Worker init + init() completes)
|
||||
require.Eventually(t, func() bool {
|
||||
resp, err := svc.DenoClient().ModuleStatus("test-mod")
|
||||
return err == nil && resp.Status == "RUNNING"
|
||||
}, 10*time.Second, 100*time.Millisecond, "module should be RUNNING")
|
||||
|
||||
// Verify the module wrote to the store via the I/O bridge
|
||||
// Module calls: core.storeSet("test-module", "init", "ok")
|
||||
conn, err := grpc.NewClient(
|
||||
"unix://"+sockPath,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
coreClient := pb.NewCoreServiceClient(conn)
|
||||
|
||||
// Poll for the store value — module init is async
|
||||
require.Eventually(t, func() bool {
|
||||
resp, err := coreClient.StoreGet(ctx, &pb.StoreGetRequest{
|
||||
Group: "test-module", Key: "init",
|
||||
})
|
||||
return err == nil && resp.Found && resp.Value == "ok"
|
||||
}, 5*time.Second, 100*time.Millisecond, "module should have written to store via I/O bridge")
|
||||
|
||||
// Unload and verify
|
||||
unloadResp, err := svc.DenoClient().UnloadModule("test-mod")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, unloadResp.Ok)
|
||||
|
||||
statusResp, err := svc.DenoClient().ModuleStatus("test-mod")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "STOPPED", statusResp.Status)
|
||||
|
||||
// Clean shutdown
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
|
||||
}
|
||||
|
||||
// createModuleRepo creates a git repo containing a test module with manifest + main.ts.
|
||||
// The module's init() writes to the store to prove the I/O bridge works.
|
||||
func createModuleRepo(t *testing.T, code string) string {
|
||||
t.Helper()
|
||||
dir := filepath.Join(t.TempDir(), code+"-repo")
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, ".core"), 0755))
|
||||
|
||||
require.NoError(t, os.WriteFile(filepath.Join(dir, ".core", "view.yml"), []byte(`
|
||||
code: `+code+`
|
||||
name: Test Module `+code+`
|
||||
version: "1.0"
|
||||
permissions:
|
||||
read: ["./"]
|
||||
`), 0644))
|
||||
|
||||
// Module that writes to store to prove it ran
|
||||
require.NoError(t, os.WriteFile(filepath.Join(dir, "main.ts"), []byte(`
|
||||
export async function init(core: any) {
|
||||
await core.storeSet("`+code+`", "installed", "yes");
|
||||
}
|
||||
`), 0644))
|
||||
|
||||
gitCmd := func(args ...string) {
|
||||
t.Helper()
|
||||
cmd := exec.Command("git", append([]string{
|
||||
"-C", dir, "-c", "user.email=test@test.com", "-c", "user.name=test",
|
||||
}, args...)...)
|
||||
out, err := cmd.CombinedOutput()
|
||||
require.NoError(t, err, "git %v: %s", args, string(out))
|
||||
}
|
||||
gitCmd("init")
|
||||
gitCmd("add", ".")
|
||||
gitCmd("commit", "-m", "init")
|
||||
|
||||
return dir
|
||||
}
|
||||
|
||||
func TestIntegration_Tier4_MarketplaceInstall_Good(t *testing.T) {
|
||||
denoPath := findDeno(t)
|
||||
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
denoSockPath := filepath.Join(tmpDir, "deno.sock")
|
||||
|
||||
// Write app manifest
|
||||
coreDir := filepath.Join(tmpDir, ".core")
|
||||
require.NoError(t, os.MkdirAll(coreDir, 0755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(`
|
||||
code: tier4-test
|
||||
name: Tier 4 Test
|
||||
version: "1.0"
|
||||
permissions:
|
||||
read: ["./"]
|
||||
`), 0644))
|
||||
|
||||
entryPoint := runtimeEntryPoint(t)
|
||||
|
||||
opts := Options{
|
||||
DenoPath: denoPath,
|
||||
SocketPath: sockPath,
|
||||
DenoSocketPath: denoSockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
SidecarArgs: []string{"run", "-A", "--unstable-worker-options", entryPoint},
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(opts)
|
||||
result, err := factory(c)
|
||||
require.NoError(t, err)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err = svc.OnStartup(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify sidecar and Deno client are up
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(denoSockPath)
|
||||
return err == nil
|
||||
}, 10*time.Second, 50*time.Millisecond, "deno socket should appear")
|
||||
|
||||
require.NotNil(t, svc.DenoClient(), "DenoClient should be connected")
|
||||
require.NotNil(t, svc.Installer(), "Installer should be available")
|
||||
|
||||
// Create a test module repo and install it
|
||||
moduleRepo := createModuleRepo(t, "market-mod")
|
||||
err = svc.Installer().Install(ctx, marketplace.Module{
|
||||
Code: "market-mod",
|
||||
Repo: moduleRepo,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the module was installed on disk
|
||||
modulesDir := filepath.Join(tmpDir, "modules", "market-mod")
|
||||
require.DirExists(t, modulesDir)
|
||||
|
||||
// Verify Installed() returns it
|
||||
installed, err := svc.Installer().Installed()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, installed, 1)
|
||||
assert.Equal(t, "market-mod", installed[0].Code)
|
||||
assert.Equal(t, "1.0", installed[0].Version)
|
||||
|
||||
// Load the installed module into the Deno runtime
|
||||
mod := installed[0]
|
||||
loadResp, err := svc.DenoClient().LoadModule(mod.Code, mod.EntryPoint, ModulePermissions{
|
||||
Read: mod.Permissions.Read,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, loadResp.Ok)
|
||||
|
||||
// Wait for module to reach RUNNING
|
||||
require.Eventually(t, func() bool {
|
||||
resp, err := svc.DenoClient().ModuleStatus("market-mod")
|
||||
return err == nil && resp.Status == "RUNNING"
|
||||
}, 10*time.Second, 100*time.Millisecond, "installed module should be RUNNING")
|
||||
|
||||
// Verify the module wrote to the store via I/O bridge
|
||||
conn, err := grpc.NewClient(
|
||||
"unix://"+sockPath,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
coreClient := pb.NewCoreServiceClient(conn)
|
||||
require.Eventually(t, func() bool {
|
||||
resp, err := coreClient.StoreGet(ctx, &pb.StoreGetRequest{
|
||||
Group: "market-mod", Key: "installed",
|
||||
})
|
||||
return err == nil && resp.Found && resp.Value == "yes"
|
||||
}, 5*time.Second, 100*time.Millisecond, "installed module should have written to store via I/O bridge")
|
||||
|
||||
// Unload and remove
|
||||
unloadResp, err := svc.DenoClient().UnloadModule("market-mod")
|
||||
require.NoError(t, err)
|
||||
assert.True(t, unloadResp.Ok)
|
||||
|
||||
err = svc.Installer().Remove("market-mod")
|
||||
require.NoError(t, err)
|
||||
assert.NoDirExists(t, modulesDir, "module directory should be removed")
|
||||
|
||||
installed2, err := svc.Installer().Installed()
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, installed2, "no modules should be installed after remove")
|
||||
|
||||
// Clean shutdown
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, svc.sidecar.IsRunning(), "Deno sidecar should be stopped")
|
||||
}
|
||||
|
|
@ -23,11 +23,17 @@ func (s *Sidecar) Start(ctx context.Context, args ...string) error {
|
|||
return fmt.Errorf("coredeno: mkdir %s: %w", sockDir, err)
|
||||
}
|
||||
|
||||
// Remove stale socket
|
||||
os.Remove(s.opts.SocketPath)
|
||||
// Remove stale Deno socket (the Core socket is managed by ListenGRPC)
|
||||
if s.opts.DenoSocketPath != "" {
|
||||
os.Remove(s.opts.DenoSocketPath)
|
||||
}
|
||||
|
||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||
s.cmd = exec.CommandContext(s.ctx, s.opts.DenoPath, args...)
|
||||
s.cmd.Env = append(os.Environ(),
|
||||
"CORE_SOCKET="+s.opts.SocketPath,
|
||||
"DENO_SOCKET="+s.opts.DenoSocketPath,
|
||||
)
|
||||
s.done = make(chan struct{})
|
||||
if err := s.cmd.Start(); err != nil {
|
||||
s.cmd = nil
|
||||
|
|
|
|||
|
|
@ -36,6 +36,74 @@ func TestStop_Good_NotStarted(t *testing.T) {
|
|||
assert.NoError(t, err, "stopping a not-started sidecar should be a no-op")
|
||||
}
|
||||
|
||||
func TestStart_Good_EnvPassedToChild(t *testing.T) {
|
||||
sockDir := t.TempDir()
|
||||
sockPath := filepath.Join(sockDir, "test.sock")
|
||||
|
||||
sc := NewSidecar(Options{
|
||||
DenoPath: "sleep",
|
||||
SocketPath: sockPath,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := sc.Start(ctx, "10")
|
||||
require.NoError(t, err)
|
||||
defer sc.Stop()
|
||||
|
||||
// Verify the child process has CORE_SOCKET in its environment
|
||||
sc.mu.RLock()
|
||||
env := sc.cmd.Env
|
||||
sc.mu.RUnlock()
|
||||
|
||||
found := false
|
||||
expected := "CORE_SOCKET=" + sockPath
|
||||
for _, e := range env {
|
||||
if e == expected {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.True(t, found, "child process should receive CORE_SOCKET=%s", sockPath)
|
||||
}
|
||||
|
||||
func TestStart_Good_DenoSocketEnv(t *testing.T) {
|
||||
sockDir := t.TempDir()
|
||||
sockPath := filepath.Join(sockDir, "core.sock")
|
||||
denoSockPath := filepath.Join(sockDir, "deno.sock")
|
||||
|
||||
sc := NewSidecar(Options{
|
||||
DenoPath: "sleep",
|
||||
SocketPath: sockPath,
|
||||
DenoSocketPath: denoSockPath,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
err := sc.Start(ctx, "10")
|
||||
require.NoError(t, err)
|
||||
defer sc.Stop()
|
||||
|
||||
sc.mu.RLock()
|
||||
env := sc.cmd.Env
|
||||
sc.mu.RUnlock()
|
||||
|
||||
foundCore := false
|
||||
foundDeno := false
|
||||
for _, e := range env {
|
||||
if e == "CORE_SOCKET="+sockPath {
|
||||
foundCore = true
|
||||
}
|
||||
if e == "DENO_SOCKET="+denoSockPath {
|
||||
foundDeno = true
|
||||
}
|
||||
}
|
||||
assert.True(t, foundCore, "child should receive CORE_SOCKET")
|
||||
assert.True(t, foundDeno, "child should receive DENO_SOCKET")
|
||||
}
|
||||
|
||||
func TestSocketDirCreated_Good(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
sockPath := filepath.Join(dir, "sub", "deno.sock")
|
||||
|
|
|
|||
47
pkg/coredeno/listener.go
Normal file
47
pkg/coredeno/listener.go
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
package coredeno
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// ListenGRPC starts a gRPC server on a Unix socket, serving the CoreService.
|
||||
// It blocks until ctx is cancelled, then performs a graceful stop.
|
||||
func ListenGRPC(ctx context.Context, socketPath string, srv *Server) error {
|
||||
// Clean up stale socket
|
||||
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
listener, err := net.Listen("unix", socketPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = listener.Close()
|
||||
_ = os.Remove(socketPath)
|
||||
}()
|
||||
|
||||
gs := grpc.NewServer()
|
||||
pb.RegisterCoreServiceServer(gs, srv)
|
||||
|
||||
// Graceful stop when context cancelled
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
gs.GracefulStop()
|
||||
}()
|
||||
|
||||
if err := gs.Serve(listener); err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil // Expected shutdown
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
112
pkg/coredeno/listener_test.go
Normal file
112
pkg/coredeno/listener_test.go
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
package coredeno
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
"forge.lthn.ai/core/go/pkg/io"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
func TestListenGRPC_Good(t *testing.T) {
|
||||
sockDir := t.TempDir()
|
||||
sockPath := filepath.Join(sockDir, "test.sock")
|
||||
|
||||
medium := io.NewMockMedium()
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
srv := NewServer(medium, st)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- ListenGRPC(ctx, sockPath, srv)
|
||||
}()
|
||||
|
||||
// Wait for socket to appear
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(sockPath)
|
||||
return err == nil
|
||||
}, 2*time.Second, 10*time.Millisecond, "socket should appear")
|
||||
|
||||
// Connect as gRPC client
|
||||
conn, err := grpc.NewClient(
|
||||
"unix://"+sockPath,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
client := pb.NewCoreServiceClient(conn)
|
||||
|
||||
// StoreSet + StoreGet round-trip
|
||||
_, err = client.StoreSet(ctx, &pb.StoreSetRequest{
|
||||
Group: "test", Key: "k", Value: "v",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.StoreGet(ctx, &pb.StoreGetRequest{
|
||||
Group: "test", Key: "k",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, resp.Found)
|
||||
assert.Equal(t, "v", resp.Value)
|
||||
|
||||
// Cancel ctx to stop listener
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
assert.NoError(t, err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("listener did not stop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestListenGRPC_Bad_StaleSocket(t *testing.T) {
|
||||
sockDir := t.TempDir()
|
||||
sockPath := filepath.Join(sockDir, "stale.sock")
|
||||
|
||||
// Create a stale socket file
|
||||
require.NoError(t, os.WriteFile(sockPath, []byte("stale"), 0644))
|
||||
|
||||
medium := io.NewMockMedium()
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
srv := NewServer(medium, st)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- ListenGRPC(ctx, sockPath, srv)
|
||||
}()
|
||||
|
||||
// Should replace stale file and start listening
|
||||
require.Eventually(t, func() bool {
|
||||
info, err := os.Stat(sockPath)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
// Socket file type, not regular file
|
||||
return info.Mode()&os.ModeSocket != 0
|
||||
}, 2*time.Second, 10*time.Millisecond, "socket should replace stale file")
|
||||
|
||||
cancel()
|
||||
<-errCh
|
||||
}
|
||||
95
pkg/coredeno/runtime/client.ts
Normal file
95
pkg/coredeno/runtime/client.ts
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
// CoreService gRPC client — Deno calls Go for I/O operations.
|
||||
// All filesystem, store, and process operations route through this client.
|
||||
|
||||
import * as grpc from "@grpc/grpc-js";
|
||||
import * as protoLoader from "@grpc/proto-loader";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
const __dirname = dirname(fileURLToPath(import.meta.url));
|
||||
const PROTO_PATH = join(__dirname, "..", "proto", "coredeno.proto");
|
||||
|
||||
let packageDef: protoLoader.PackageDefinition | null = null;
|
||||
|
||||
function getProto(): any {
|
||||
if (!packageDef) {
|
||||
packageDef = protoLoader.loadSync(PROTO_PATH, {
|
||||
keepCase: true,
|
||||
longs: String,
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true,
|
||||
});
|
||||
}
|
||||
return grpc.loadPackageDefinition(packageDef).coredeno as any;
|
||||
}
|
||||
|
||||
export interface CoreClient {
|
||||
raw: any;
|
||||
storeGet(group: string, key: string): Promise<{ value: string; found: boolean }>;
|
||||
storeSet(group: string, key: string, value: string): Promise<{ ok: boolean }>;
|
||||
fileRead(path: string, moduleCode: string): Promise<{ content: string }>;
|
||||
fileWrite(path: string, content: string, moduleCode: string): Promise<{ ok: boolean }>;
|
||||
fileList(path: string, moduleCode: string): Promise<{ entries: Array<{ name: string; is_dir: boolean; size: number }> }>;
|
||||
fileDelete(path: string, moduleCode: string): Promise<{ ok: boolean }>;
|
||||
processStart(command: string, args: string[], moduleCode: string): Promise<{ process_id: string }>;
|
||||
processStop(processId: string): Promise<{ ok: boolean }>;
|
||||
close(): void;
|
||||
}
|
||||
|
||||
function promisify<T>(client: any, method: string, request: any): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
client[method](request, (err: Error | null, response: T) => {
|
||||
if (err) reject(err);
|
||||
else resolve(response);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function createCoreClient(socketPath: string): CoreClient {
|
||||
const proto = getProto();
|
||||
const client = new proto.CoreService(
|
||||
`unix:${socketPath}`,
|
||||
grpc.credentials.createInsecure(),
|
||||
);
|
||||
|
||||
return {
|
||||
raw: client,
|
||||
|
||||
storeGet(group: string, key: string) {
|
||||
return promisify(client, "StoreGet", { group, key });
|
||||
},
|
||||
|
||||
storeSet(group: string, key: string, value: string) {
|
||||
return promisify(client, "StoreSet", { group, key, value });
|
||||
},
|
||||
|
||||
fileRead(path: string, moduleCode: string) {
|
||||
return promisify(client, "FileRead", { path, module_code: moduleCode });
|
||||
},
|
||||
|
||||
fileWrite(path: string, content: string, moduleCode: string) {
|
||||
return promisify(client, "FileWrite", { path, content, module_code: moduleCode });
|
||||
},
|
||||
|
||||
fileList(path: string, moduleCode: string) {
|
||||
return promisify(client, "FileList", { path, module_code: moduleCode });
|
||||
},
|
||||
|
||||
fileDelete(path: string, moduleCode: string) {
|
||||
return promisify(client, "FileDelete", { path, module_code: moduleCode });
|
||||
},
|
||||
|
||||
processStart(command: string, args: string[], moduleCode: string) {
|
||||
return promisify(client, "ProcessStart", { command, args, module_code: moduleCode });
|
||||
},
|
||||
|
||||
processStop(processId: string) {
|
||||
return promisify(client, "ProcessStop", { process_id: processId });
|
||||
},
|
||||
|
||||
close() {
|
||||
client.close();
|
||||
},
|
||||
};
|
||||
}
|
||||
8
pkg/coredeno/runtime/deno.json
Normal file
8
pkg/coredeno/runtime/deno.json
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
{
|
||||
"imports": {
|
||||
"@grpc/grpc-js": "npm:@grpc/grpc-js@^1.12",
|
||||
"@grpc/proto-loader": "npm:@grpc/proto-loader@^0.7"
|
||||
},
|
||||
"nodeModulesDir": "none",
|
||||
"unstable": ["worker-options"]
|
||||
}
|
||||
193
pkg/coredeno/runtime/deno.lock
generated
Normal file
193
pkg/coredeno/runtime/deno.lock
generated
Normal file
|
|
@ -0,0 +1,193 @@
|
|||
{
|
||||
"version": "5",
|
||||
"specifiers": {
|
||||
"npm:@grpc/grpc-js@^1.12.0": "1.14.3",
|
||||
"npm:@grpc/proto-loader@0.7": "0.7.15"
|
||||
},
|
||||
"npm": {
|
||||
"@grpc/grpc-js@1.14.3": {
|
||||
"integrity": "sha512-Iq8QQQ/7X3Sac15oB6p0FmUg/klxQvXLeileoqrTRGJYLV+/9tubbr9ipz0GKHjmXVsgFPo/+W+2cA8eNcR+XA==",
|
||||
"dependencies": [
|
||||
"@grpc/proto-loader@0.8.0",
|
||||
"@js-sdsl/ordered-map"
|
||||
]
|
||||
},
|
||||
"@grpc/proto-loader@0.7.15": {
|
||||
"integrity": "sha512-tMXdRCfYVixjuFK+Hk0Q1s38gV9zDiDJfWL3h1rv4Qc39oILCu1TRTDt7+fGUI8K4G1Fj125Hx/ru3azECWTyQ==",
|
||||
"dependencies": [
|
||||
"lodash.camelcase",
|
||||
"long",
|
||||
"protobufjs",
|
||||
"yargs"
|
||||
],
|
||||
"bin": true
|
||||
},
|
||||
"@grpc/proto-loader@0.8.0": {
|
||||
"integrity": "sha512-rc1hOQtjIWGxcxpb9aHAfLpIctjEnsDehj0DAiVfBlmT84uvR0uUtN2hEi/ecvWVjXUGf5qPF4qEgiLOx1YIMQ==",
|
||||
"dependencies": [
|
||||
"lodash.camelcase",
|
||||
"long",
|
||||
"protobufjs",
|
||||
"yargs"
|
||||
],
|
||||
"bin": true
|
||||
},
|
||||
"@js-sdsl/ordered-map@4.4.2": {
|
||||
"integrity": "sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw=="
|
||||
},
|
||||
"@protobufjs/aspromise@1.1.2": {
|
||||
"integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ=="
|
||||
},
|
||||
"@protobufjs/base64@1.1.2": {
|
||||
"integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg=="
|
||||
},
|
||||
"@protobufjs/codegen@2.0.4": {
|
||||
"integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg=="
|
||||
},
|
||||
"@protobufjs/eventemitter@1.1.0": {
|
||||
"integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q=="
|
||||
},
|
||||
"@protobufjs/fetch@1.1.0": {
|
||||
"integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==",
|
||||
"dependencies": [
|
||||
"@protobufjs/aspromise",
|
||||
"@protobufjs/inquire"
|
||||
]
|
||||
},
|
||||
"@protobufjs/float@1.0.2": {
|
||||
"integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ=="
|
||||
},
|
||||
"@protobufjs/inquire@1.1.0": {
|
||||
"integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q=="
|
||||
},
|
||||
"@protobufjs/path@1.1.2": {
|
||||
"integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA=="
|
||||
},
|
||||
"@protobufjs/pool@1.1.0": {
|
||||
"integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw=="
|
||||
},
|
||||
"@protobufjs/utf8@1.1.0": {
|
||||
"integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw=="
|
||||
},
|
||||
"@types/node@25.2.3": {
|
||||
"integrity": "sha512-m0jEgYlYz+mDJZ2+F4v8D1AyQb+QzsNqRuI7xg1VQX/KlKS0qT9r1Mo16yo5F/MtifXFgaofIFsdFMox2SxIbQ==",
|
||||
"dependencies": [
|
||||
"undici-types"
|
||||
]
|
||||
},
|
||||
"ansi-regex@5.0.1": {
|
||||
"integrity": "sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ=="
|
||||
},
|
||||
"ansi-styles@4.3.0": {
|
||||
"integrity": "sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==",
|
||||
"dependencies": [
|
||||
"color-convert"
|
||||
]
|
||||
},
|
||||
"cliui@8.0.1": {
|
||||
"integrity": "sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==",
|
||||
"dependencies": [
|
||||
"string-width",
|
||||
"strip-ansi",
|
||||
"wrap-ansi"
|
||||
]
|
||||
},
|
||||
"color-convert@2.0.1": {
|
||||
"integrity": "sha512-RRECPsj7iu/xb5oKYcsFHSppFNnsj/52OVTRKb4zP5onXwVF3zVmmToNcOfGC+CRDpfK/U584fMg38ZHCaElKQ==",
|
||||
"dependencies": [
|
||||
"color-name"
|
||||
]
|
||||
},
|
||||
"color-name@1.1.4": {
|
||||
"integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA=="
|
||||
},
|
||||
"emoji-regex@8.0.0": {
|
||||
"integrity": "sha512-MSjYzcWNOA0ewAHpz0MxpYFvwg6yjy1NG3xteoqz644VCo/RPgnr1/GGt+ic3iJTzQ8Eu3TdM14SawnVUmGE6A=="
|
||||
},
|
||||
"escalade@3.2.0": {
|
||||
"integrity": "sha512-WUj2qlxaQtO4g6Pq5c29GTcWGDyd8itL8zTlipgECz3JesAiiOKotd8JU6otB3PACgG6xkJUyVhboMS+bje/jA=="
|
||||
},
|
||||
"get-caller-file@2.0.5": {
|
||||
"integrity": "sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg=="
|
||||
},
|
||||
"is-fullwidth-code-point@3.0.0": {
|
||||
"integrity": "sha512-zymm5+u+sCsSWyD9qNaejV3DFvhCKclKdizYaJUuHA83RLjb7nSuGnddCHGv0hk+KY7BMAlsWeK4Ueg6EV6XQg=="
|
||||
},
|
||||
"lodash.camelcase@4.3.0": {
|
||||
"integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA=="
|
||||
},
|
||||
"long@5.3.2": {
|
||||
"integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA=="
|
||||
},
|
||||
"protobufjs@7.5.4": {
|
||||
"integrity": "sha512-CvexbZtbov6jW2eXAvLukXjXUW1TzFaivC46BpWc/3BpcCysb5Vffu+B3XHMm8lVEuy2Mm4XGex8hBSg1yapPg==",
|
||||
"dependencies": [
|
||||
"@protobufjs/aspromise",
|
||||
"@protobufjs/base64",
|
||||
"@protobufjs/codegen",
|
||||
"@protobufjs/eventemitter",
|
||||
"@protobufjs/fetch",
|
||||
"@protobufjs/float",
|
||||
"@protobufjs/inquire",
|
||||
"@protobufjs/path",
|
||||
"@protobufjs/pool",
|
||||
"@protobufjs/utf8",
|
||||
"@types/node",
|
||||
"long"
|
||||
],
|
||||
"scripts": true
|
||||
},
|
||||
"require-directory@2.1.1": {
|
||||
"integrity": "sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q=="
|
||||
},
|
||||
"string-width@4.2.3": {
|
||||
"integrity": "sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==",
|
||||
"dependencies": [
|
||||
"emoji-regex",
|
||||
"is-fullwidth-code-point",
|
||||
"strip-ansi"
|
||||
]
|
||||
},
|
||||
"strip-ansi@6.0.1": {
|
||||
"integrity": "sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==",
|
||||
"dependencies": [
|
||||
"ansi-regex"
|
||||
]
|
||||
},
|
||||
"undici-types@7.16.0": {
|
||||
"integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw=="
|
||||
},
|
||||
"wrap-ansi@7.0.0": {
|
||||
"integrity": "sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==",
|
||||
"dependencies": [
|
||||
"ansi-styles",
|
||||
"string-width",
|
||||
"strip-ansi"
|
||||
]
|
||||
},
|
||||
"y18n@5.0.8": {
|
||||
"integrity": "sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA=="
|
||||
},
|
||||
"yargs-parser@21.1.1": {
|
||||
"integrity": "sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw=="
|
||||
},
|
||||
"yargs@17.7.2": {
|
||||
"integrity": "sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==",
|
||||
"dependencies": [
|
||||
"cliui",
|
||||
"escalade",
|
||||
"get-caller-file",
|
||||
"require-directory",
|
||||
"string-width",
|
||||
"y18n",
|
||||
"yargs-parser"
|
||||
]
|
||||
}
|
||||
},
|
||||
"workspace": {
|
||||
"dependencies": [
|
||||
"npm:@grpc/grpc-js@^1.12.0",
|
||||
"npm:@grpc/proto-loader@0.7"
|
||||
]
|
||||
}
|
||||
}
|
||||
106
pkg/coredeno/runtime/main.ts
Normal file
106
pkg/coredeno/runtime/main.ts
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
// CoreDeno Runtime Entry Point
|
||||
// Connects to CoreGO via gRPC over Unix socket.
|
||||
// Implements DenoService for module lifecycle management.
|
||||
|
||||
// Must be first import — patches http2 before @grpc/grpc-js loads.
|
||||
import "./polyfill.ts";
|
||||
|
||||
import { createCoreClient, type CoreClient } from "./client.ts";
|
||||
import { startDenoServer, type DenoServer } from "./server.ts";
|
||||
import { ModuleRegistry } from "./modules.ts";
|
||||
|
||||
// Read required environment variables
|
||||
const coreSocket = Deno.env.get("CORE_SOCKET");
|
||||
if (!coreSocket) {
|
||||
console.error("FATAL: CORE_SOCKET environment variable not set");
|
||||
Deno.exit(1);
|
||||
}
|
||||
|
||||
const denoSocket = Deno.env.get("DENO_SOCKET");
|
||||
if (!denoSocket) {
|
||||
console.error("FATAL: DENO_SOCKET environment variable not set");
|
||||
Deno.exit(1);
|
||||
}
|
||||
|
||||
console.error(`CoreDeno: CORE_SOCKET=${coreSocket}`);
|
||||
console.error(`CoreDeno: DENO_SOCKET=${denoSocket}`);
|
||||
|
||||
// 1. Create module registry
|
||||
const registry = new ModuleRegistry();
|
||||
|
||||
// 2. Start DenoService server (Go calls us here via JSON-RPC over Unix socket)
|
||||
let denoServer: DenoServer;
|
||||
try {
|
||||
denoServer = await startDenoServer(denoSocket, registry);
|
||||
console.error("CoreDeno: DenoService server started");
|
||||
} catch (err) {
|
||||
console.error(`FATAL: failed to start DenoService server: ${err}`);
|
||||
Deno.exit(1);
|
||||
}
|
||||
|
||||
// 3. Connect to CoreService (we call Go here) with retry
|
||||
let coreClient: CoreClient;
|
||||
{
|
||||
coreClient = createCoreClient(coreSocket);
|
||||
const maxRetries = 20;
|
||||
let connected = false;
|
||||
let lastErr: unknown;
|
||||
for (let i = 0; i < maxRetries; i++) {
|
||||
try {
|
||||
const timeoutCall = <T>(p: Promise<T>): Promise<T> =>
|
||||
Promise.race([
|
||||
p,
|
||||
new Promise<T>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("call timeout")), 2000),
|
||||
),
|
||||
]);
|
||||
await timeoutCall(
|
||||
coreClient.storeSet("_coredeno", "status", "connected"),
|
||||
);
|
||||
const resp = await timeoutCall(
|
||||
coreClient.storeGet("_coredeno", "status"),
|
||||
);
|
||||
if (resp.found && resp.value === "connected") {
|
||||
connected = true;
|
||||
break;
|
||||
}
|
||||
} catch (err) {
|
||||
lastErr = err;
|
||||
if (i < 3 || i === 9 || i === 19) {
|
||||
console.error(`CoreDeno: retry ${i}: ${err}`);
|
||||
}
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 250));
|
||||
}
|
||||
if (!connected) {
|
||||
console.error(
|
||||
`FATAL: failed to connect to CoreService after retries, last error: ${lastErr}`,
|
||||
);
|
||||
denoServer.close();
|
||||
Deno.exit(1);
|
||||
}
|
||||
console.error("CoreDeno: CoreService client connected");
|
||||
}
|
||||
|
||||
// 4. Inject CoreClient into registry for I/O bridge
|
||||
registry.setCoreClient(coreClient);
|
||||
|
||||
// 5. Signal readiness
|
||||
console.error("CoreDeno: ready");
|
||||
|
||||
// 6. Keep alive until SIGTERM
|
||||
const ac = new AbortController();
|
||||
Deno.addSignalListener("SIGTERM", () => {
|
||||
console.error("CoreDeno: shutting down");
|
||||
ac.abort();
|
||||
});
|
||||
|
||||
try {
|
||||
await new Promise((_resolve, reject) => {
|
||||
ac.signal.addEventListener("abort", () => reject(new Error("shutdown")));
|
||||
});
|
||||
} catch {
|
||||
// Clean shutdown
|
||||
coreClient.close();
|
||||
denoServer.close();
|
||||
}
|
||||
202
pkg/coredeno/runtime/modules.ts
Normal file
202
pkg/coredeno/runtime/modules.ts
Normal file
|
|
@ -0,0 +1,202 @@
|
|||
// Module registry — manages module lifecycle with Deno Worker isolation.
|
||||
// Each module runs in its own Worker with per-module permission sandboxing.
|
||||
// I/O bridge relays Worker postMessage calls to CoreService gRPC.
|
||||
|
||||
import type { CoreClient } from "./client.ts";
|
||||
|
||||
export type ModuleStatus =
|
||||
| "UNKNOWN"
|
||||
| "LOADING"
|
||||
| "RUNNING"
|
||||
| "STOPPED"
|
||||
| "ERRORED";
|
||||
|
||||
export interface ModulePermissions {
|
||||
read?: string[];
|
||||
write?: string[];
|
||||
net?: string[];
|
||||
run?: string[];
|
||||
}
|
||||
|
||||
interface Module {
|
||||
code: string;
|
||||
entryPoint: string;
|
||||
permissions: ModulePermissions;
|
||||
status: ModuleStatus;
|
||||
worker?: Worker;
|
||||
}
|
||||
|
||||
export class ModuleRegistry {
|
||||
private modules = new Map<string, Module>();
|
||||
private coreClient: CoreClient | null = null;
|
||||
private workerEntryUrl: string;
|
||||
|
||||
constructor() {
|
||||
this.workerEntryUrl = new URL("./worker-entry.ts", import.meta.url).href;
|
||||
}
|
||||
|
||||
setCoreClient(client: CoreClient): void {
|
||||
this.coreClient = client;
|
||||
}
|
||||
|
||||
load(code: string, entryPoint: string, permissions: ModulePermissions): void {
|
||||
// Terminate existing worker if reloading
|
||||
const existing = this.modules.get(code);
|
||||
if (existing?.worker) {
|
||||
existing.worker.terminate();
|
||||
}
|
||||
|
||||
const mod: Module = {
|
||||
code,
|
||||
entryPoint,
|
||||
permissions,
|
||||
status: "LOADING",
|
||||
};
|
||||
this.modules.set(code, mod);
|
||||
|
||||
// Resolve entry point URL for the module
|
||||
const moduleUrl =
|
||||
entryPoint.startsWith("file://") || entryPoint.startsWith("http")
|
||||
? entryPoint
|
||||
: "file://" + entryPoint;
|
||||
|
||||
// Build read permissions: worker-entry.ts dir + module source + declared reads
|
||||
const readPerms: string[] = [
|
||||
new URL(".", import.meta.url).pathname,
|
||||
];
|
||||
// Add the module's directory so it can be dynamically imported
|
||||
if (!entryPoint.startsWith("http")) {
|
||||
const modPath = entryPoint.startsWith("file://")
|
||||
? entryPoint.slice(7)
|
||||
: entryPoint;
|
||||
// Add the module file's directory
|
||||
const lastSlash = modPath.lastIndexOf("/");
|
||||
if (lastSlash > 0) readPerms.push(modPath.slice(0, lastSlash + 1));
|
||||
else readPerms.push(modPath);
|
||||
}
|
||||
if (permissions.read) readPerms.push(...permissions.read);
|
||||
|
||||
// Create Worker with permission sandbox
|
||||
const worker = new Worker(this.workerEntryUrl, {
|
||||
type: "module",
|
||||
name: code,
|
||||
// deno-lint-ignore no-explicit-any
|
||||
deno: {
|
||||
permissions: {
|
||||
read: readPerms,
|
||||
write: permissions.write ?? [],
|
||||
net: permissions.net ?? [],
|
||||
run: permissions.run ?? [],
|
||||
env: false,
|
||||
sys: false,
|
||||
ffi: false,
|
||||
},
|
||||
},
|
||||
} as any);
|
||||
|
||||
mod.worker = worker;
|
||||
|
||||
// I/O bridge: relay Worker RPC to CoreClient
|
||||
worker.onmessage = async (e: MessageEvent) => {
|
||||
const msg = e.data;
|
||||
|
||||
if (msg.type === "ready") {
|
||||
worker.postMessage({ type: "load", url: moduleUrl });
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "loaded") {
|
||||
mod.status = msg.ok ? "RUNNING" : "ERRORED";
|
||||
if (msg.ok) {
|
||||
console.error(`CoreDeno: module running: ${code}`);
|
||||
} else {
|
||||
console.error(`CoreDeno: module error: ${code}: ${msg.error}`);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "rpc" && this.coreClient) {
|
||||
try {
|
||||
const result = await this.dispatchRPC(
|
||||
code,
|
||||
msg.method,
|
||||
msg.params,
|
||||
);
|
||||
worker.postMessage({ type: "rpc_response", id: msg.id, result });
|
||||
} catch (err) {
|
||||
worker.postMessage({
|
||||
type: "rpc_response",
|
||||
id: msg.id,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
worker.onerror = (e: ErrorEvent) => {
|
||||
mod.status = "ERRORED";
|
||||
console.error(`CoreDeno: worker error: ${code}: ${e.message}`);
|
||||
};
|
||||
|
||||
console.error(`CoreDeno: module loading: ${code}`);
|
||||
}
|
||||
|
||||
private async dispatchRPC(
|
||||
moduleCode: string,
|
||||
method: string,
|
||||
params: Record<string, unknown>,
|
||||
): Promise<unknown> {
|
||||
const c = this.coreClient!;
|
||||
switch (method) {
|
||||
case "StoreGet":
|
||||
return c.storeGet(params.group as string, params.key as string);
|
||||
case "StoreSet":
|
||||
return c.storeSet(
|
||||
params.group as string,
|
||||
params.key as string,
|
||||
params.value as string,
|
||||
);
|
||||
case "FileRead":
|
||||
return c.fileRead(params.path as string, moduleCode);
|
||||
case "FileWrite":
|
||||
return c.fileWrite(
|
||||
params.path as string,
|
||||
params.content as string,
|
||||
moduleCode,
|
||||
);
|
||||
case "ProcessStart":
|
||||
return c.processStart(
|
||||
params.command as string,
|
||||
params.args as string[],
|
||||
moduleCode,
|
||||
);
|
||||
case "ProcessStop":
|
||||
return c.processStop(params.process_id as string);
|
||||
default:
|
||||
throw new Error(`unknown RPC method: ${method}`);
|
||||
}
|
||||
}
|
||||
|
||||
unload(code: string): boolean {
|
||||
const mod = this.modules.get(code);
|
||||
if (!mod) return false;
|
||||
if (mod.worker) {
|
||||
mod.worker.terminate();
|
||||
mod.worker = undefined;
|
||||
}
|
||||
mod.status = "STOPPED";
|
||||
console.error(`CoreDeno: module unloaded: ${code}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
status(code: string): ModuleStatus {
|
||||
return this.modules.get(code)?.status ?? "UNKNOWN";
|
||||
}
|
||||
|
||||
list(): Array<{ code: string; status: ModuleStatus }> {
|
||||
return Array.from(this.modules.values()).map((m) => ({
|
||||
code: m.code,
|
||||
status: m.status,
|
||||
}));
|
||||
}
|
||||
}
|
||||
94
pkg/coredeno/runtime/polyfill.ts
Normal file
94
pkg/coredeno/runtime/polyfill.ts
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
// Deno http2 + grpc-js polyfill — must be imported BEFORE @grpc/grpc-js.
|
||||
//
|
||||
// Two issues with Deno 2.x node compat:
|
||||
// 1. http2.getDefaultSettings throws "Not implemented"
|
||||
// 2. grpc-js's createConnection returns a socket that reports readyState="open"
|
||||
// but never emits "connect", causing http2 sessions to hang forever.
|
||||
// Fix: wrap createConnection to emit "connect" on next tick for open sockets.
|
||||
|
||||
import http2 from "node:http2";
|
||||
|
||||
// Fix 1: getDefaultSettings stub
|
||||
(http2 as any).getDefaultSettings = () => ({
|
||||
headerTableSize: 4096,
|
||||
enablePush: true,
|
||||
initialWindowSize: 65535,
|
||||
maxFrameSize: 16384,
|
||||
maxConcurrentStreams: 0xffffffff,
|
||||
maxHeaderListSize: 65535,
|
||||
maxHeaderSize: 65535,
|
||||
enableConnectProtocol: false,
|
||||
});
|
||||
|
||||
// Fix 2: grpc-js (transport.js line 536) passes an already-connected socket
|
||||
// to http2.connect via createConnection. Deno's http2 never completes the
|
||||
// HTTP/2 handshake because it expects a "connect" event from the socket,
|
||||
// which already fired. Emitting "connect" again causes "Busy: Unix socket
|
||||
// is currently in use" in Deno's internal http2.
|
||||
//
|
||||
// Workaround: track Unix socket paths via net.connect intercept, then in
|
||||
// createConnection, return a FRESH socket. Keep the original socket alive
|
||||
// (grpc-js has close listeners on it) but unused for data.
|
||||
import net from "node:net";
|
||||
|
||||
const socketPathMap = new WeakMap<net.Socket, string>();
|
||||
const origNetConnect = net.connect;
|
||||
(net as any).connect = function (...args: any[]) {
|
||||
const sock = origNetConnect.apply(this, args as any);
|
||||
if (args[0] && typeof args[0] === "object" && args[0].path) {
|
||||
socketPathMap.set(sock, args[0].path);
|
||||
}
|
||||
return sock;
|
||||
};
|
||||
|
||||
// Fix 3: Deno's http2 client never fires "remoteSettings" event, which
|
||||
// grpc-js waits for before marking the transport as READY.
|
||||
// Workaround: emit "remoteSettings" after "connect" with reasonable defaults.
|
||||
const origConnect = http2.connect;
|
||||
(http2 as any).connect = function (
|
||||
authority: any,
|
||||
options: any,
|
||||
...rest: any[]
|
||||
) {
|
||||
// For Unix sockets: replace pre-connected socket with fresh one
|
||||
if (options?.createConnection) {
|
||||
const origCC = options.createConnection;
|
||||
options = {
|
||||
...options,
|
||||
createConnection(...ccArgs: any[]) {
|
||||
const origSock = origCC.apply(this, ccArgs);
|
||||
const unixPath = socketPathMap.get(origSock);
|
||||
if (
|
||||
unixPath &&
|
||||
!origSock.connecting &&
|
||||
origSock.readyState === "open"
|
||||
) {
|
||||
const freshSock = net.connect({ path: unixPath });
|
||||
freshSock.on("close", () => origSock.destroy());
|
||||
return freshSock;
|
||||
}
|
||||
return origSock;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const session = origConnect.call(this, authority, options, ...rest);
|
||||
|
||||
// Emit remoteSettings after connect — Deno's http2 doesn't emit it
|
||||
session.once("connect", () => {
|
||||
if (!session.destroyed && !session.closed) {
|
||||
const settings = {
|
||||
headerTableSize: 4096,
|
||||
enablePush: false,
|
||||
initialWindowSize: 65535,
|
||||
maxFrameSize: 16384,
|
||||
maxConcurrentStreams: 100,
|
||||
maxHeaderListSize: 8192,
|
||||
maxHeaderSize: 8192,
|
||||
};
|
||||
process.nextTick(() => session.emit("remoteSettings", settings));
|
||||
}
|
||||
});
|
||||
|
||||
return session;
|
||||
};
|
||||
124
pkg/coredeno/runtime/server.ts
Normal file
124
pkg/coredeno/runtime/server.ts
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
// DenoService JSON-RPC server — Go calls Deno for module lifecycle management.
|
||||
// Uses length-prefixed JSON over raw Unix socket (Deno's http2 server is broken).
|
||||
// Protocol: 4-byte big-endian length + JSON payload, newline-delimited.
|
||||
|
||||
import { ModuleRegistry } from "./modules.ts";
|
||||
|
||||
export interface DenoServer {
|
||||
close(): void;
|
||||
}
|
||||
|
||||
export async function startDenoServer(
|
||||
socketPath: string,
|
||||
registry: ModuleRegistry,
|
||||
): Promise<DenoServer> {
|
||||
// Remove stale socket
|
||||
try {
|
||||
Deno.removeSync(socketPath);
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
|
||||
const listener = Deno.listen({ transport: "unix", path: socketPath });
|
||||
|
||||
const handleConnection = async (conn: Deno.UnixConn) => {
|
||||
const reader = conn.readable.getReader();
|
||||
const writer = conn.writable.getWriter();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
|
||||
// Process complete lines (newline-delimited JSON)
|
||||
let newlineIdx: number;
|
||||
while ((newlineIdx = buffer.indexOf("\n")) !== -1) {
|
||||
const line = buffer.slice(0, newlineIdx);
|
||||
buffer = buffer.slice(newlineIdx + 1);
|
||||
|
||||
if (!line.trim()) continue;
|
||||
|
||||
try {
|
||||
const req = JSON.parse(line);
|
||||
const resp = dispatch(req, registry);
|
||||
await writer.write(
|
||||
new TextEncoder().encode(JSON.stringify(resp) + "\n"),
|
||||
);
|
||||
} catch (err) {
|
||||
const errResp = {
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
await writer.write(
|
||||
new TextEncoder().encode(JSON.stringify(errResp) + "\n"),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Connection closed or error — expected during shutdown
|
||||
} finally {
|
||||
try {
|
||||
writer.close();
|
||||
} catch {
|
||||
/* already closed */
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Accept connections in background
|
||||
const abortController = new AbortController();
|
||||
(async () => {
|
||||
try {
|
||||
for await (const conn of listener) {
|
||||
if (abortController.signal.aborted) break;
|
||||
handleConnection(conn);
|
||||
}
|
||||
} catch {
|
||||
// Listener closed
|
||||
}
|
||||
})();
|
||||
|
||||
return {
|
||||
close() {
|
||||
abortController.abort();
|
||||
listener.close();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
interface RPCRequest {
|
||||
method: string;
|
||||
code?: string;
|
||||
entry_point?: string;
|
||||
permissions?: { read?: string[]; write?: string[]; net?: string[]; run?: string[] };
|
||||
process_id?: string;
|
||||
}
|
||||
|
||||
function dispatch(
|
||||
req: RPCRequest,
|
||||
registry: ModuleRegistry,
|
||||
): Record<string, unknown> {
|
||||
switch (req.method) {
|
||||
case "LoadModule": {
|
||||
registry.load(
|
||||
req.code ?? "",
|
||||
req.entry_point ?? "",
|
||||
req.permissions ?? {},
|
||||
);
|
||||
return { ok: true, error: "" };
|
||||
}
|
||||
case "UnloadModule": {
|
||||
const ok = registry.unload(req.code ?? "");
|
||||
return { ok };
|
||||
}
|
||||
case "ModuleStatus": {
|
||||
return { code: req.code, status: registry.status(req.code ?? "") };
|
||||
}
|
||||
default:
|
||||
return { error: `unknown method: ${req.method}` };
|
||||
}
|
||||
}
|
||||
5
pkg/coredeno/runtime/testdata/test-module.ts
vendored
Normal file
5
pkg/coredeno/runtime/testdata/test-module.ts
vendored
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
// Test module — writes to store via I/O bridge to prove Workers work.
|
||||
// Called by integration tests.
|
||||
export async function init(core: any) {
|
||||
await core.storeSet("test-module", "init", "ok");
|
||||
}
|
||||
79
pkg/coredeno/runtime/worker-entry.ts
Normal file
79
pkg/coredeno/runtime/worker-entry.ts
Normal file
|
|
@ -0,0 +1,79 @@
|
|||
// Worker bootstrap — loaded as entry point for every module Worker.
|
||||
// Sets up the I/O bridge (postMessage ↔ parent relay), then dynamically
|
||||
// imports the module and calls its init(core) function.
|
||||
//
|
||||
// The parent (ModuleRegistry) injects module_code into all gRPC calls,
|
||||
// so modules can't spoof their identity.
|
||||
|
||||
// I/O bridge: request/response correlation over postMessage
|
||||
const pending = new Map<number, { resolve: Function; reject: Function }>();
|
||||
let nextId = 0;
|
||||
|
||||
function rpc(
|
||||
method: string,
|
||||
params: Record<string, unknown>,
|
||||
): Promise<unknown> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const id = ++nextId;
|
||||
pending.set(id, { resolve, reject });
|
||||
self.postMessage({ type: "rpc", id, method, params });
|
||||
});
|
||||
}
|
||||
|
||||
// Typed core object passed to module's init() function.
|
||||
// Each method maps to a CoreService gRPC call relayed through the parent.
|
||||
const core = {
|
||||
storeGet(group: string, key: string) {
|
||||
return rpc("StoreGet", { group, key });
|
||||
},
|
||||
storeSet(group: string, key: string, value: string) {
|
||||
return rpc("StoreSet", { group, key, value });
|
||||
},
|
||||
fileRead(path: string) {
|
||||
return rpc("FileRead", { path });
|
||||
},
|
||||
fileWrite(path: string, content: string) {
|
||||
return rpc("FileWrite", { path, content });
|
||||
},
|
||||
processStart(command: string, args: string[]) {
|
||||
return rpc("ProcessStart", { command, args });
|
||||
},
|
||||
processStop(processId: string) {
|
||||
return rpc("ProcessStop", { process_id: processId });
|
||||
},
|
||||
};
|
||||
|
||||
// Handle messages from parent: RPC responses and load commands
|
||||
self.addEventListener("message", async (e: MessageEvent) => {
|
||||
const msg = e.data;
|
||||
|
||||
if (msg.type === "rpc_response") {
|
||||
const p = pending.get(msg.id);
|
||||
if (p) {
|
||||
pending.delete(msg.id);
|
||||
if (msg.error) p.reject(new Error(msg.error));
|
||||
else p.resolve(msg.result);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "load") {
|
||||
try {
|
||||
const mod = await import(msg.url);
|
||||
if (typeof mod.init === "function") {
|
||||
await mod.init(core);
|
||||
}
|
||||
self.postMessage({ type: "loaded", ok: true });
|
||||
} catch (err) {
|
||||
self.postMessage({
|
||||
type: "loaded",
|
||||
ok: false,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
});
|
||||
|
||||
// Signal ready — parent will respond with {type: "load", url: "..."}
|
||||
self.postMessage({ type: "ready" });
|
||||
|
|
@ -8,8 +8,27 @@ import (
|
|||
"forge.lthn.ai/core/go/pkg/io"
|
||||
"forge.lthn.ai/core/go/pkg/manifest"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ProcessRunner abstracts process management for the gRPC server.
|
||||
// Satisfied by *process.Service.
|
||||
type ProcessRunner interface {
|
||||
Start(ctx context.Context, command string, args ...string) (ProcessHandle, error)
|
||||
Kill(id string) error
|
||||
}
|
||||
|
||||
// ProcessHandle is returned by ProcessRunner.Start.
|
||||
type ProcessHandle interface {
|
||||
Info() ProcessInfo
|
||||
}
|
||||
|
||||
// ProcessInfo is the subset of process info the server needs.
|
||||
type ProcessInfo struct {
|
||||
ID string
|
||||
}
|
||||
|
||||
// Server implements the CoreService gRPC interface with permission gating.
|
||||
// Every I/O request is checked against the calling module's declared permissions.
|
||||
type Server struct {
|
||||
|
|
@ -17,6 +36,7 @@ type Server struct {
|
|||
medium io.Medium
|
||||
store *store.Store
|
||||
manifests map[string]*manifest.Manifest
|
||||
processes ProcessRunner
|
||||
}
|
||||
|
||||
// NewServer creates a CoreService server backed by the given Medium and Store.
|
||||
|
|
@ -129,3 +149,38 @@ func (s *Server) StoreSet(_ context.Context, req *pb.StoreSetRequest) (*pb.Store
|
|||
}
|
||||
return &pb.StoreSetResponse{Ok: true}, nil
|
||||
}
|
||||
|
||||
// SetProcessRunner sets the process runner for ProcessStart/ProcessStop.
|
||||
func (s *Server) SetProcessRunner(pr ProcessRunner) {
|
||||
s.processes = pr
|
||||
}
|
||||
|
||||
// ProcessStart implements CoreService.ProcessStart with permission gating.
|
||||
func (s *Server) ProcessStart(ctx context.Context, req *pb.ProcessStartRequest) (*pb.ProcessStartResponse, error) {
|
||||
if s.processes == nil {
|
||||
return nil, status.Error(codes.Unimplemented, "process service not available")
|
||||
}
|
||||
m, err := s.getManifest(req.ModuleCode)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !CheckRun(req.Command, m.Permissions.Run) {
|
||||
return nil, fmt.Errorf("permission denied: %s cannot run %s", req.ModuleCode, req.Command)
|
||||
}
|
||||
proc, err := s.processes.Start(ctx, req.Command, req.Args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("process start: %w", err)
|
||||
}
|
||||
return &pb.ProcessStartResponse{ProcessId: proc.Info().ID}, nil
|
||||
}
|
||||
|
||||
// ProcessStop implements CoreService.ProcessStop.
|
||||
func (s *Server) ProcessStop(_ context.Context, req *pb.ProcessStopRequest) (*pb.ProcessStopResponse, error) {
|
||||
if s.processes == nil {
|
||||
return nil, status.Error(codes.Unimplemented, "process service not available")
|
||||
}
|
||||
if err := s.processes.Kill(req.ProcessId); err != nil {
|
||||
return nil, fmt.Errorf("process stop: %w", err)
|
||||
}
|
||||
return &pb.ProcessStopResponse{Ok: true}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,16 +2,48 @@ package coredeno
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
"forge.lthn.ai/core/go/pkg/io"
|
||||
"forge.lthn.ai/core/go/pkg/manifest"
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// mockProcessRunner implements ProcessRunner for testing.
|
||||
type mockProcessRunner struct {
|
||||
started map[string]bool
|
||||
nextID int
|
||||
}
|
||||
|
||||
func newMockProcessRunner() *mockProcessRunner {
|
||||
return &mockProcessRunner{started: make(map[string]bool)}
|
||||
}
|
||||
|
||||
func (m *mockProcessRunner) Start(_ context.Context, command string, args ...string) (ProcessHandle, error) {
|
||||
m.nextID++
|
||||
id := fmt.Sprintf("proc-%d", m.nextID)
|
||||
m.started[id] = true
|
||||
return &mockProcessHandle{id: id}, nil
|
||||
}
|
||||
|
||||
func (m *mockProcessRunner) Kill(id string) error {
|
||||
if !m.started[id] {
|
||||
return fmt.Errorf("process not found: %s", id)
|
||||
}
|
||||
delete(m.started, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockProcessHandle struct{ id string }
|
||||
|
||||
func (h *mockProcessHandle) Info() ProcessInfo { return ProcessInfo{ID: h.id} }
|
||||
|
||||
func newTestServer(t *testing.T) *Server {
|
||||
t.Helper()
|
||||
medium := io.NewMockMedium()
|
||||
|
|
@ -95,3 +127,74 @@ func TestStoreGet_Good_NotFound(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
assert.False(t, resp.Found)
|
||||
}
|
||||
|
||||
func newTestServerWithProcess(t *testing.T) (*Server, *mockProcessRunner) {
|
||||
t.Helper()
|
||||
srv := newTestServer(t)
|
||||
srv.RegisterModule(&manifest.Manifest{
|
||||
Code: "runner-mod",
|
||||
Permissions: manifest.Permissions{
|
||||
Run: []string{"echo", "ls"},
|
||||
},
|
||||
})
|
||||
pr := newMockProcessRunner()
|
||||
srv.SetProcessRunner(pr)
|
||||
return srv, pr
|
||||
}
|
||||
|
||||
func TestProcessStart_Good(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
resp, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "echo", Args: []string{"hello"}, ModuleCode: "runner-mod",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, resp.ProcessId)
|
||||
}
|
||||
|
||||
func TestProcessStart_Bad_PermissionDenied(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
_, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "rm", Args: []string{"-rf", "/"}, ModuleCode: "runner-mod",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "permission denied")
|
||||
}
|
||||
|
||||
func TestProcessStart_Bad_NoProcessService(t *testing.T) {
|
||||
srv := newTestServer(t)
|
||||
srv.RegisterModule(&manifest.Manifest{
|
||||
Code: "no-proc-mod",
|
||||
Permissions: manifest.Permissions{Run: []string{"echo"}},
|
||||
})
|
||||
_, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "echo", ModuleCode: "no-proc-mod",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
st, ok := status.FromError(err)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, codes.Unimplemented, st.Code())
|
||||
}
|
||||
|
||||
func TestProcessStop_Good(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
// Start a process first
|
||||
startResp, err := srv.ProcessStart(context.Background(), &pb.ProcessStartRequest{
|
||||
Command: "echo", ModuleCode: "runner-mod",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Stop it
|
||||
resp, err := srv.ProcessStop(context.Background(), &pb.ProcessStopRequest{
|
||||
ProcessId: startResp.ProcessId,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, resp.Ok)
|
||||
}
|
||||
|
||||
func TestProcessStop_Bad_NotFound(t *testing.T) {
|
||||
srv, _ := newTestServerWithProcess(t)
|
||||
_, err := srv.ProcessStop(context.Background(), &pb.ProcessStopRequest{
|
||||
ProcessId: "nonexistent",
|
||||
})
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,8 +2,16 @@ package coredeno
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
core "forge.lthn.ai/core/go/pkg/framework/core"
|
||||
"forge.lthn.ai/core/go/pkg/io"
|
||||
"forge.lthn.ai/core/go/pkg/manifest"
|
||||
"forge.lthn.ai/core/go/pkg/marketplace"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
)
|
||||
|
||||
// Service wraps the CoreDeno sidecar as a framework service.
|
||||
|
|
@ -14,7 +22,13 @@ import (
|
|||
// core.New(core.WithService(coredeno.NewServiceFactory(opts)))
|
||||
type Service struct {
|
||||
*core.ServiceRuntime[Options]
|
||||
sidecar *Sidecar
|
||||
sidecar *Sidecar
|
||||
grpcServer *Server
|
||||
store *store.Store
|
||||
grpcCancel context.CancelFunc
|
||||
grpcDone chan error
|
||||
denoClient *DenoClient
|
||||
installer *marketplace.Installer
|
||||
}
|
||||
|
||||
// NewServiceFactory returns a factory function for framework registration via WithService.
|
||||
|
|
@ -27,17 +41,168 @@ func NewServiceFactory(opts Options) func(*core.Core) (any, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// OnStartup starts the Deno sidecar. Called by the framework on app startup.
|
||||
// OnStartup boots the CoreDeno subsystem. Called by the framework on app startup.
|
||||
//
|
||||
// Sequence: medium → store → server → manifest → gRPC listener → sidecar.
|
||||
func (s *Service) OnStartup(ctx context.Context) error {
|
||||
opts := s.Opts()
|
||||
|
||||
// 1. Create sandboxed Medium (or mock if no AppRoot)
|
||||
var medium io.Medium
|
||||
if opts.AppRoot != "" {
|
||||
var err error
|
||||
medium, err = io.NewSandboxed(opts.AppRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("coredeno: medium: %w", err)
|
||||
}
|
||||
} else {
|
||||
medium = io.NewMockMedium()
|
||||
}
|
||||
|
||||
// 2. Create Store
|
||||
dbPath := opts.StoreDBPath
|
||||
if dbPath == "" {
|
||||
dbPath = ":memory:"
|
||||
}
|
||||
var err error
|
||||
s.store, err = store.New(dbPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("coredeno: store: %w", err)
|
||||
}
|
||||
|
||||
// 3. Create gRPC Server
|
||||
s.grpcServer = NewServer(medium, s.store)
|
||||
|
||||
// 4. Load manifest if AppRoot set (non-fatal if missing)
|
||||
if opts.AppRoot != "" {
|
||||
m, loadErr := manifest.Load(medium, ".")
|
||||
if loadErr == nil && m != nil {
|
||||
if opts.PublicKey != nil {
|
||||
if ok, verr := manifest.Verify(m, opts.PublicKey); verr == nil && ok {
|
||||
s.grpcServer.RegisterModule(m)
|
||||
}
|
||||
} else {
|
||||
s.grpcServer.RegisterModule(m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Start gRPC listener in background
|
||||
grpcCtx, grpcCancel := context.WithCancel(ctx)
|
||||
s.grpcCancel = grpcCancel
|
||||
s.grpcDone = make(chan error, 1)
|
||||
go func() {
|
||||
s.grpcDone <- ListenGRPC(grpcCtx, opts.SocketPath, s.grpcServer)
|
||||
}()
|
||||
|
||||
// 6. Start sidecar (if args provided)
|
||||
if len(opts.SidecarArgs) > 0 {
|
||||
// Wait for core socket so sidecar can connect to our gRPC server
|
||||
if err := waitForSocket(ctx, opts.SocketPath, 5*time.Second); err != nil {
|
||||
return fmt.Errorf("coredeno: core socket: %w", err)
|
||||
}
|
||||
|
||||
if err := s.sidecar.Start(ctx, opts.SidecarArgs...); err != nil {
|
||||
return fmt.Errorf("coredeno: sidecar: %w", err)
|
||||
}
|
||||
|
||||
// 7. Wait for Deno's server and connect as client
|
||||
if opts.DenoSocketPath != "" {
|
||||
if err := waitForSocket(ctx, opts.DenoSocketPath, 10*time.Second); err != nil {
|
||||
return fmt.Errorf("coredeno: deno socket: %w", err)
|
||||
}
|
||||
dc, err := DialDeno(opts.DenoSocketPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("coredeno: deno client: %w", err)
|
||||
}
|
||||
s.denoClient = dc
|
||||
}
|
||||
}
|
||||
|
||||
// 8. Create installer and auto-load installed modules
|
||||
if opts.AppRoot != "" {
|
||||
modulesDir := filepath.Join(opts.AppRoot, "modules")
|
||||
s.installer = marketplace.NewInstaller(modulesDir, s.store)
|
||||
|
||||
if s.denoClient != nil {
|
||||
installed, listErr := s.installer.Installed()
|
||||
if listErr == nil {
|
||||
for _, mod := range installed {
|
||||
perms := ModulePermissions{
|
||||
Read: mod.Permissions.Read,
|
||||
Write: mod.Permissions.Write,
|
||||
Net: mod.Permissions.Net,
|
||||
Run: mod.Permissions.Run,
|
||||
}
|
||||
s.denoClient.LoadModule(mod.Code, mod.EntryPoint, perms)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnShutdown stops the Deno sidecar. Called by the framework on app shutdown.
|
||||
// OnShutdown stops the CoreDeno subsystem. Called by the framework on app shutdown.
|
||||
func (s *Service) OnShutdown(_ context.Context) error {
|
||||
return s.sidecar.Stop()
|
||||
// Close Deno client connection
|
||||
if s.denoClient != nil {
|
||||
s.denoClient.Close()
|
||||
}
|
||||
|
||||
// Stop sidecar
|
||||
_ = s.sidecar.Stop()
|
||||
|
||||
// Stop gRPC listener
|
||||
if s.grpcCancel != nil {
|
||||
s.grpcCancel()
|
||||
<-s.grpcDone
|
||||
}
|
||||
|
||||
// Close store
|
||||
if s.store != nil {
|
||||
s.store.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sidecar returns the underlying sidecar for direct access.
|
||||
func (s *Service) Sidecar() *Sidecar {
|
||||
return s.sidecar
|
||||
}
|
||||
|
||||
// GRPCServer returns the gRPC server for direct access.
|
||||
func (s *Service) GRPCServer() *Server {
|
||||
return s.grpcServer
|
||||
}
|
||||
|
||||
// DenoClient returns the DenoService client for calling the Deno sidecar.
|
||||
// Returns nil if the sidecar was not started or has no DenoSocketPath.
|
||||
func (s *Service) DenoClient() *DenoClient {
|
||||
return s.denoClient
|
||||
}
|
||||
|
||||
// Installer returns the marketplace module installer.
|
||||
// Returns nil if AppRoot was not set.
|
||||
func (s *Service) Installer() *marketplace.Installer {
|
||||
return s.installer
|
||||
}
|
||||
|
||||
// waitForSocket polls until a Unix socket file appears or the context/timeout expires.
|
||||
func waitForSocket(ctx context.Context, path string, timeout time.Duration) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
for {
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
return nil
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return fmt.Errorf("timeout waiting for socket %s", path)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,11 +2,17 @@ package coredeno
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "forge.lthn.ai/core/go/pkg/coredeno/proto"
|
||||
core "forge.lthn.ai/core/go/pkg/framework/core"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
)
|
||||
|
||||
func TestNewServiceFactory_Good(t *testing.T) {
|
||||
|
|
@ -37,18 +43,27 @@ func TestService_WithService_Good(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestService_Lifecycle_Good(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "lifecycle.sock")
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(Options{DenoPath: "echo"})
|
||||
factory := NewServiceFactory(Options{
|
||||
DenoPath: "echo",
|
||||
SocketPath: sockPath,
|
||||
})
|
||||
result, _ := factory(c)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Verify Startable
|
||||
err = svc.OnStartup(context.Background())
|
||||
err = svc.OnStartup(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Verify Stoppable (not started, should be no-op)
|
||||
// Verify Stoppable
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
|
@ -63,3 +78,106 @@ func TestService_Sidecar_Good(t *testing.T) {
|
|||
|
||||
assert.NotNil(t, svc.Sidecar())
|
||||
}
|
||||
|
||||
func TestService_OnStartup_Good(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
|
||||
// Write a minimal manifest
|
||||
coreDir := filepath.Join(tmpDir, ".core")
|
||||
require.NoError(t, os.MkdirAll(coreDir, 0755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(coreDir, "view.yml"), []byte(`
|
||||
code: test-app
|
||||
name: Test App
|
||||
version: "1.0"
|
||||
permissions:
|
||||
read: ["./data/"]
|
||||
write: ["./data/"]
|
||||
`), 0644))
|
||||
|
||||
opts := Options{
|
||||
DenoPath: "sleep",
|
||||
SocketPath: sockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
SidecarArgs: []string{"60"},
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(opts)
|
||||
result, err := factory(c)
|
||||
require.NoError(t, err)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
err = svc.OnStartup(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify socket appeared
|
||||
require.Eventually(t, func() bool {
|
||||
_, err := os.Stat(sockPath)
|
||||
return err == nil
|
||||
}, 2*time.Second, 10*time.Millisecond, "gRPC socket should appear after startup")
|
||||
|
||||
// Verify gRPC responds
|
||||
conn, err := grpc.NewClient(
|
||||
"unix://"+sockPath,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
client := pb.NewCoreServiceClient(conn)
|
||||
_, err = client.StoreSet(ctx, &pb.StoreSetRequest{
|
||||
Group: "boot", Key: "ok", Value: "true",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := client.StoreGet(ctx, &pb.StoreGetRequest{
|
||||
Group: "boot", Key: "ok",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, resp.Found)
|
||||
assert.Equal(t, "true", resp.Value)
|
||||
|
||||
// Verify sidecar is running
|
||||
assert.True(t, svc.sidecar.IsRunning(), "sidecar should be running")
|
||||
|
||||
// Shutdown
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, svc.sidecar.IsRunning(), "sidecar should be stopped")
|
||||
}
|
||||
|
||||
func TestService_OnStartup_Good_NoManifest(t *testing.T) {
|
||||
tmpDir := t.TempDir()
|
||||
sockPath := filepath.Join(tmpDir, "core.sock")
|
||||
|
||||
opts := Options{
|
||||
DenoPath: "sleep",
|
||||
SocketPath: sockPath,
|
||||
AppRoot: tmpDir,
|
||||
StoreDBPath: ":memory:",
|
||||
}
|
||||
|
||||
c, err := core.New()
|
||||
require.NoError(t, err)
|
||||
|
||||
factory := NewServiceFactory(opts)
|
||||
result, _ := factory(c)
|
||||
svc := result.(*Service)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Should succeed even without .core/view.yml
|
||||
err = svc.OnStartup(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = svc.OnShutdown(context.Background())
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ import (
|
|||
|
||||
const manifestPath = ".core/view.yml"
|
||||
|
||||
// marshalYAML serializes a manifest to YAML bytes.
|
||||
func marshalYAML(m *Manifest) ([]byte, error) {
|
||||
// MarshalYAML serializes a manifest to YAML bytes.
|
||||
func MarshalYAML(m *Manifest) ([]byte, error) {
|
||||
return yaml.Marshal(m)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func TestLoadVerified_Good(t *testing.T) {
|
|||
}
|
||||
_ = Sign(m, priv)
|
||||
|
||||
raw, _ := marshalYAML(m)
|
||||
raw, _ := MarshalYAML(m)
|
||||
fs := io.NewMockMedium()
|
||||
fs.Files[".core/view.yml"] = string(raw)
|
||||
|
||||
|
|
@ -53,7 +53,7 @@ func TestLoadVerified_Bad_Tampered(t *testing.T) {
|
|||
m := &Manifest{Code: "app", Version: "1.0.0"}
|
||||
_ = Sign(m, priv)
|
||||
|
||||
raw, _ := marshalYAML(m)
|
||||
raw, _ := MarshalYAML(m)
|
||||
tampered := "code: evil\n" + string(raw)[6:]
|
||||
fs := io.NewMockMedium()
|
||||
fs.Files[".core/view.yml"] = tampered
|
||||
|
|
|
|||
194
pkg/marketplace/installer.go
Normal file
194
pkg/marketplace/installer.go
Normal file
|
|
@ -0,0 +1,194 @@
|
|||
package marketplace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"forge.lthn.ai/core/go/pkg/io"
|
||||
"forge.lthn.ai/core/go/pkg/manifest"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
)
|
||||
|
||||
const storeGroup = "_modules"
|
||||
|
||||
// Installer handles module installation from Git repos.
|
||||
type Installer struct {
|
||||
modulesDir string
|
||||
store *store.Store
|
||||
}
|
||||
|
||||
// NewInstaller creates a new module installer.
|
||||
func NewInstaller(modulesDir string, st *store.Store) *Installer {
|
||||
return &Installer{
|
||||
modulesDir: modulesDir,
|
||||
store: st,
|
||||
}
|
||||
}
|
||||
|
||||
// InstalledModule holds stored metadata about an installed module.
|
||||
type InstalledModule struct {
|
||||
Code string `json:"code"`
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
Repo string `json:"repo"`
|
||||
EntryPoint string `json:"entry_point"`
|
||||
Permissions manifest.Permissions `json:"permissions"`
|
||||
InstalledAt string `json:"installed_at"`
|
||||
}
|
||||
|
||||
// Install clones a module repo, verifies its manifest signature, and registers it.
|
||||
func (i *Installer) Install(ctx context.Context, mod Module) error {
|
||||
// Check if already installed
|
||||
if _, err := i.store.Get(storeGroup, mod.Code); err == nil {
|
||||
return fmt.Errorf("marketplace: module %q already installed", mod.Code)
|
||||
}
|
||||
|
||||
dest := filepath.Join(i.modulesDir, mod.Code)
|
||||
if err := os.MkdirAll(i.modulesDir, 0755); err != nil {
|
||||
return fmt.Errorf("marketplace: mkdir: %w", err)
|
||||
}
|
||||
if err := gitClone(ctx, mod.Repo, dest); err != nil {
|
||||
return fmt.Errorf("marketplace: clone %s: %w", mod.Repo, err)
|
||||
}
|
||||
|
||||
// On any error after clone, clean up the directory
|
||||
cleanup := true
|
||||
defer func() {
|
||||
if cleanup {
|
||||
os.RemoveAll(dest)
|
||||
}
|
||||
}()
|
||||
|
||||
medium, err := io.NewSandboxed(dest)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marketplace: medium: %w", err)
|
||||
}
|
||||
|
||||
m, err := loadManifest(medium, mod.SignKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
entryPoint := filepath.Join(dest, "main.ts")
|
||||
installed := InstalledModule{
|
||||
Code: mod.Code,
|
||||
Name: m.Name,
|
||||
Version: m.Version,
|
||||
Repo: mod.Repo,
|
||||
EntryPoint: entryPoint,
|
||||
Permissions: m.Permissions,
|
||||
InstalledAt: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(installed)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marketplace: marshal: %w", err)
|
||||
}
|
||||
|
||||
if err := i.store.Set(storeGroup, mod.Code, string(data)); err != nil {
|
||||
return fmt.Errorf("marketplace: store: %w", err)
|
||||
}
|
||||
|
||||
cleanup = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove uninstalls a module by deleting its files and store entry.
|
||||
func (i *Installer) Remove(code string) error {
|
||||
if _, err := i.store.Get(storeGroup, code); err != nil {
|
||||
return fmt.Errorf("marketplace: module %q not installed", code)
|
||||
}
|
||||
|
||||
dest := filepath.Join(i.modulesDir, code)
|
||||
os.RemoveAll(dest)
|
||||
|
||||
return i.store.Delete(storeGroup, code)
|
||||
}
|
||||
|
||||
// Update pulls latest changes and re-verifies the manifest.
|
||||
func (i *Installer) Update(ctx context.Context, code string) error {
|
||||
raw, err := i.store.Get(storeGroup, code)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marketplace: module %q not installed", code)
|
||||
}
|
||||
|
||||
var installed InstalledModule
|
||||
if err := json.Unmarshal([]byte(raw), &installed); err != nil {
|
||||
return fmt.Errorf("marketplace: unmarshal: %w", err)
|
||||
}
|
||||
|
||||
dest := filepath.Join(i.modulesDir, code)
|
||||
|
||||
cmd := exec.CommandContext(ctx, "git", "-C", dest, "pull", "--ff-only")
|
||||
if output, err := cmd.CombinedOutput(); err != nil {
|
||||
return fmt.Errorf("marketplace: pull: %s: %w", strings.TrimSpace(string(output)), err)
|
||||
}
|
||||
|
||||
// Reload manifest
|
||||
medium, mErr := io.NewSandboxed(dest)
|
||||
if mErr != nil {
|
||||
return fmt.Errorf("marketplace: medium: %w", mErr)
|
||||
}
|
||||
m, mErr := manifest.Load(medium, ".")
|
||||
if mErr != nil {
|
||||
return fmt.Errorf("marketplace: reload manifest: %w", mErr)
|
||||
}
|
||||
|
||||
// Update stored metadata
|
||||
installed.Name = m.Name
|
||||
installed.Version = m.Version
|
||||
installed.Permissions = m.Permissions
|
||||
|
||||
data, err := json.Marshal(installed)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marketplace: marshal: %w", err)
|
||||
}
|
||||
|
||||
return i.store.Set(storeGroup, code, string(data))
|
||||
}
|
||||
|
||||
// Installed returns all installed module metadata.
|
||||
func (i *Installer) Installed() ([]InstalledModule, error) {
|
||||
all, err := i.store.GetAll(storeGroup)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marketplace: list: %w", err)
|
||||
}
|
||||
|
||||
var modules []InstalledModule
|
||||
for _, raw := range all {
|
||||
var m InstalledModule
|
||||
if err := json.Unmarshal([]byte(raw), &m); err != nil {
|
||||
continue
|
||||
}
|
||||
modules = append(modules, m)
|
||||
}
|
||||
return modules, nil
|
||||
}
|
||||
|
||||
// loadManifest loads and optionally verifies a module manifest.
|
||||
func loadManifest(medium io.Medium, signKey string) (*manifest.Manifest, error) {
|
||||
if signKey != "" {
|
||||
pubBytes, err := hex.DecodeString(signKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marketplace: decode sign key: %w", err)
|
||||
}
|
||||
return manifest.LoadVerified(medium, ".", pubBytes)
|
||||
}
|
||||
return manifest.Load(medium, ".")
|
||||
}
|
||||
|
||||
// gitClone clones a repository with --depth=1.
|
||||
func gitClone(ctx context.Context, repo, dest string) error {
|
||||
cmd := exec.CommandContext(ctx, "git", "clone", "--depth=1", repo, dest)
|
||||
if output, err := cmd.CombinedOutput(); err != nil {
|
||||
return fmt.Errorf("%s: %w", strings.TrimSpace(string(output)), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
263
pkg/marketplace/installer_test.go
Normal file
263
pkg/marketplace/installer_test.go
Normal file
|
|
@ -0,0 +1,263 @@
|
|||
package marketplace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"encoding/hex"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"forge.lthn.ai/core/go/pkg/manifest"
|
||||
"forge.lthn.ai/core/go/pkg/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// createTestRepo creates a bare-bones git repo with a manifest and main.ts.
|
||||
// Returns the repo path (usable as Module.Repo for local clone).
|
||||
func createTestRepo(t *testing.T, code, version string) string {
|
||||
t.Helper()
|
||||
dir := filepath.Join(t.TempDir(), code)
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, ".core"), 0755))
|
||||
|
||||
manifestYAML := "code: " + code + "\nname: Test " + code + "\nversion: \"" + version + "\"\n"
|
||||
require.NoError(t, os.WriteFile(
|
||||
filepath.Join(dir, ".core", "view.yml"),
|
||||
[]byte(manifestYAML), 0644,
|
||||
))
|
||||
require.NoError(t, os.WriteFile(
|
||||
filepath.Join(dir, "main.ts"),
|
||||
[]byte("export async function init(core: any) {}\n"), 0644,
|
||||
))
|
||||
|
||||
runGit(t, dir, "init")
|
||||
runGit(t, dir, "add", ".")
|
||||
runGit(t, dir, "commit", "-m", "init")
|
||||
return dir
|
||||
}
|
||||
|
||||
// createSignedTestRepo creates a git repo with a signed manifest.
|
||||
// Returns (repo path, hex-encoded public key).
|
||||
func createSignedTestRepo(t *testing.T, code, version string) (string, string) {
|
||||
t.Helper()
|
||||
pub, priv, err := ed25519.GenerateKey(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
dir := filepath.Join(t.TempDir(), code)
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, ".core"), 0755))
|
||||
|
||||
m := &manifest.Manifest{
|
||||
Code: code,
|
||||
Name: "Test " + code,
|
||||
Version: version,
|
||||
}
|
||||
require.NoError(t, manifest.Sign(m, priv))
|
||||
|
||||
data, err := manifest.MarshalYAML(m)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, os.WriteFile(filepath.Join(dir, ".core", "view.yml"), data, 0644))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(dir, "main.ts"), []byte("export async function init(core: any) {}\n"), 0644))
|
||||
|
||||
runGit(t, dir, "init")
|
||||
runGit(t, dir, "add", ".")
|
||||
runGit(t, dir, "commit", "-m", "init")
|
||||
|
||||
return dir, hex.EncodeToString(pub)
|
||||
}
|
||||
|
||||
func runGit(t *testing.T, dir string, args ...string) {
|
||||
t.Helper()
|
||||
cmd := exec.Command("git", append([]string{"-C", dir, "-c", "user.email=test@test.com", "-c", "user.name=test"}, args...)...)
|
||||
out, err := cmd.CombinedOutput()
|
||||
require.NoError(t, err, "git %v: %s", args, string(out))
|
||||
}
|
||||
|
||||
func TestInstall_Good(t *testing.T) {
|
||||
repo := createTestRepo(t, "hello-mod", "1.0")
|
||||
modulesDir := filepath.Join(t.TempDir(), "modules")
|
||||
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(modulesDir, st)
|
||||
err = inst.Install(context.Background(), Module{
|
||||
Code: "hello-mod",
|
||||
Repo: repo,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify directory exists
|
||||
_, err = os.Stat(filepath.Join(modulesDir, "hello-mod", "main.ts"))
|
||||
assert.NoError(t, err, "main.ts should exist in installed module")
|
||||
|
||||
// Verify store entry
|
||||
raw, err := st.Get("_modules", "hello-mod")
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, raw, `"code":"hello-mod"`)
|
||||
assert.Contains(t, raw, `"version":"1.0"`)
|
||||
}
|
||||
|
||||
func TestInstall_Good_Signed(t *testing.T) {
|
||||
repo, signKey := createSignedTestRepo(t, "signed-mod", "2.0")
|
||||
modulesDir := filepath.Join(t.TempDir(), "modules")
|
||||
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(modulesDir, st)
|
||||
err = inst.Install(context.Background(), Module{
|
||||
Code: "signed-mod",
|
||||
Repo: repo,
|
||||
SignKey: signKey,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
raw, err := st.Get("_modules", "signed-mod")
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, raw, `"version":"2.0"`)
|
||||
}
|
||||
|
||||
func TestInstall_Bad_AlreadyInstalled(t *testing.T) {
|
||||
repo := createTestRepo(t, "dup-mod", "1.0")
|
||||
modulesDir := filepath.Join(t.TempDir(), "modules")
|
||||
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(modulesDir, st)
|
||||
mod := Module{Code: "dup-mod", Repo: repo}
|
||||
|
||||
require.NoError(t, inst.Install(context.Background(), mod))
|
||||
err = inst.Install(context.Background(), mod)
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "already installed")
|
||||
}
|
||||
|
||||
func TestInstall_Bad_InvalidSignature(t *testing.T) {
|
||||
// Sign with key A, verify with key B
|
||||
repo, _ := createSignedTestRepo(t, "bad-sig", "1.0")
|
||||
_, wrongKey := createSignedTestRepo(t, "dummy", "1.0") // different key
|
||||
|
||||
modulesDir := filepath.Join(t.TempDir(), "modules")
|
||||
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(modulesDir, st)
|
||||
err = inst.Install(context.Background(), Module{
|
||||
Code: "bad-sig",
|
||||
Repo: repo,
|
||||
SignKey: wrongKey,
|
||||
})
|
||||
assert.Error(t, err)
|
||||
|
||||
// Verify directory was cleaned up
|
||||
_, statErr := os.Stat(filepath.Join(modulesDir, "bad-sig"))
|
||||
assert.True(t, os.IsNotExist(statErr), "directory should be cleaned up on failure")
|
||||
}
|
||||
|
||||
func TestRemove_Good(t *testing.T) {
|
||||
repo := createTestRepo(t, "rm-mod", "1.0")
|
||||
modulesDir := filepath.Join(t.TempDir(), "modules")
|
||||
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(modulesDir, st)
|
||||
require.NoError(t, inst.Install(context.Background(), Module{Code: "rm-mod", Repo: repo}))
|
||||
|
||||
err = inst.Remove("rm-mod")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Directory gone
|
||||
_, statErr := os.Stat(filepath.Join(modulesDir, "rm-mod"))
|
||||
assert.True(t, os.IsNotExist(statErr))
|
||||
|
||||
// Store entry gone
|
||||
_, err = st.Get("_modules", "rm-mod")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestRemove_Bad_NotInstalled(t *testing.T) {
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(t.TempDir(), st)
|
||||
err = inst.Remove("nonexistent")
|
||||
assert.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "not installed")
|
||||
}
|
||||
|
||||
func TestInstalled_Good(t *testing.T) {
|
||||
modulesDir := filepath.Join(t.TempDir(), "modules")
|
||||
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(modulesDir, st)
|
||||
|
||||
repo1 := createTestRepo(t, "mod-a", "1.0")
|
||||
repo2 := createTestRepo(t, "mod-b", "2.0")
|
||||
|
||||
require.NoError(t, inst.Install(context.Background(), Module{Code: "mod-a", Repo: repo1}))
|
||||
require.NoError(t, inst.Install(context.Background(), Module{Code: "mod-b", Repo: repo2}))
|
||||
|
||||
installed, err := inst.Installed()
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, installed, 2)
|
||||
|
||||
codes := map[string]bool{}
|
||||
for _, m := range installed {
|
||||
codes[m.Code] = true
|
||||
}
|
||||
assert.True(t, codes["mod-a"])
|
||||
assert.True(t, codes["mod-b"])
|
||||
}
|
||||
|
||||
func TestInstalled_Good_Empty(t *testing.T) {
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(t.TempDir(), st)
|
||||
installed, err := inst.Installed()
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, installed)
|
||||
}
|
||||
|
||||
func TestUpdate_Good(t *testing.T) {
|
||||
repo := createTestRepo(t, "upd-mod", "1.0")
|
||||
modulesDir := filepath.Join(t.TempDir(), "modules")
|
||||
|
||||
st, err := store.New(":memory:")
|
||||
require.NoError(t, err)
|
||||
defer st.Close()
|
||||
|
||||
inst := NewInstaller(modulesDir, st)
|
||||
require.NoError(t, inst.Install(context.Background(), Module{Code: "upd-mod", Repo: repo}))
|
||||
|
||||
// Update the origin repo
|
||||
newManifest := "code: upd-mod\nname: Updated Module\nversion: \"2.0\"\n"
|
||||
require.NoError(t, os.WriteFile(filepath.Join(repo, ".core", "view.yml"), []byte(newManifest), 0644))
|
||||
runGit(t, repo, "add", ".")
|
||||
runGit(t, repo, "commit", "-m", "bump version")
|
||||
|
||||
err = inst.Update(context.Background(), "upd-mod")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify updated metadata
|
||||
installed, err := inst.Installed()
|
||||
require.NoError(t, err)
|
||||
require.Len(t, installed, 1)
|
||||
assert.Equal(t, "2.0", installed[0].Version)
|
||||
assert.Equal(t, "Updated Module", installed[0].Name)
|
||||
}
|
||||
|
|
@ -95,6 +95,25 @@ func (s *Store) DeleteGroup(group string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetAll returns all key-value pairs in a group.
|
||||
func (s *Store) GetAll(group string) (map[string]string, error) {
|
||||
rows, err := s.db.Query("SELECT key, value FROM kv WHERE grp = ?", group)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("store.GetAll: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
result := make(map[string]string)
|
||||
for rows.Next() {
|
||||
var k, v string
|
||||
if err := rows.Scan(&k, &v); err != nil {
|
||||
return nil, fmt.Errorf("store.GetAll: scan: %w", err)
|
||||
}
|
||||
result[k] = v
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Render loads all key-value pairs from a group and renders a Go template.
|
||||
func (s *Store) Render(tmplStr, group string) (string, error) {
|
||||
rows, err := s.db.Query("SELECT key, value FROM kv WHERE grp = ?", group)
|
||||
|
|
|
|||
|
|
@ -66,6 +66,28 @@ func TestDeleteGroup_Good(t *testing.T) {
|
|||
assert.Equal(t, 0, n)
|
||||
}
|
||||
|
||||
func TestGetAll_Good(t *testing.T) {
|
||||
s, _ := New(":memory:")
|
||||
defer s.Close()
|
||||
|
||||
_ = s.Set("grp", "a", "1")
|
||||
_ = s.Set("grp", "b", "2")
|
||||
_ = s.Set("other", "c", "3")
|
||||
|
||||
all, err := s.GetAll("grp")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"a": "1", "b": "2"}, all)
|
||||
}
|
||||
|
||||
func TestGetAll_Good_Empty(t *testing.T) {
|
||||
s, _ := New(":memory:")
|
||||
defer s.Close()
|
||||
|
||||
all, err := s.GetAll("empty")
|
||||
require.NoError(t, err)
|
||||
assert.Empty(t, all)
|
||||
}
|
||||
|
||||
func TestRender_Good(t *testing.T) {
|
||||
s, _ := New(":memory:")
|
||||
defer s.Close()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue