Compare commits

..

3 commits
dev ... new

Author SHA1 Message Date
Snider
8172824b42 fix: update tests to match current API after refactor
- node: add ReadFile (fs.ReadFileFS), Walk with WalkOptions, CopyFile
- node_test: fix Exists to single-return bool, FromTar as method call
- cache_test: remove Medium parameter, use t.TempDir()
- daemon_test: remove Medium from NewPIDFile/DaemonOptions, use os pkg

Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-17 22:14:06 +00:00
Snider
9b7a0bc30a docs: LEM conversational training pipeline design
Design for native Go ML training pipeline replacing Python scripts.
Key components: training sequences (curricula), layered LoRA sessions,
sandwich generation, interactive lesson-based training, native Go
LoRA via MLX-C bindings. No Python dependency.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-17 16:55:52 +00:00
Snider
8410093400 feat(process): add Supervisor for managed service lifecycle
Adds a Supervisor layer to pkg/process that manages long-running
processes and goroutines with automatic restart, panic recovery,
and graceful shutdown. Supports both external processes (DaemonSpec)
and Go functions (GoSpec) with configurable restart policies.

Also exposes AddHealthCheck on the Daemon struct so supervised
services can wire their status into the daemon health endpoint.

Co-Authored-By: Virgil <virgil@lethean.io>
2026-02-17 16:14:49 +00:00
27 changed files with 1179 additions and 1183 deletions

View file

@ -0,0 +1,234 @@
# LEM Conversational Training Pipeline — Design
**Date:** 2026-02-17
**Status:** Draft
## Goal
Replace Python training scripts with a native Go pipeline in `core` commands. No Python anywhere. The process is conversational — not batch data dumps.
## Architecture
Six `core ml` subcommands forming a pipeline:
```
seeds + axioms ──> sandwich ──> score ──> train ──> bench
↑ │
chat (interactive) │
↑ │
└──────── iterate ─────────────┘
```
### Commands
| Command | Purpose | Status |
|---------|---------|--------|
| `core ml serve` | Serve model via OpenAI-compatible API + lem-chat UI | **Exists** |
| `core ml chat` | Interactive conversation, captures exchanges to training JSONL | **New** |
| `core ml sandwich` | Wrap seeds in axiom prefix/postfix, generate responses via inference | **New** |
| `core ml score` | Score responses against axiom alignment | **Exists** (needs Go port) |
| `core ml train` | Native Go LoRA fine-tuning via MLX C bindings | **New** (hard) |
| `core ml bench` | Benchmark trained model against baseline | **Exists** (needs Go port) |
### Data Flow
1. **Seeds** (`seeds/*.json`) — 40+ seed prompts across domains
2. **Axioms** (`axioms.json`) — LEK-1 kernel (5 axioms, 9KB)
3. **Sandwich**`[axioms prefix] + [seed prompt] + [LEK postfix]` → model generates response
4. **Training JSONL**`{"messages": [{"role":"user",...},{"role":"assistant",...}]}` chat format
5. **LoRA adapters** — safetensors in adapter directory
6. **Benchmarks** — scores stored in InfluxDB, exported via DuckDB/Parquet
### Storage
- **InfluxDB** — time-series training metrics, benchmark scores, generation logs
- **DuckDB** — analytical queries, Parquet export for HuggingFace
- **Filesystem** — model weights, adapters, training JSONL, seeds
## Native Go LoRA Training
The critical new capability. MLX-C supports autograd (`mlx_vjp`, `mlx_value_and_grad`).
### What we need in Go MLX bindings:
1. **LoRA adapter layers** — low-rank A*B decomposition wrapping existing Linear layers
2. **Loss function** — cross-entropy on assistant tokens only (mask-prompt behaviour)
3. **Optimizer** — AdamW with weight decay
4. **Training loop** — forward pass → loss → backward pass → update LoRA weights
5. **Checkpoint** — save/load adapter safetensors
### LoRA Layer Design
```go
type LoRALinear struct {
Base *Linear // Frozen base weights
A *Array // [rank, in_features] — trainable
B *Array // [out_features, rank] — trainable
Scale float32 // alpha/rank
}
// Forward: base(x) + scale * B @ A @ x
func (l *LoRALinear) Forward(x *Array) *Array {
base := l.Base.Forward(x)
lora := MatMul(l.B, MatMul(l.A, Transpose(x)))
return Add(base, Multiply(lora, l.Scale))
}
```
### Training Config
```go
type TrainConfig struct {
ModelPath string // Base model directory
TrainData string // Training JSONL path
ValidData string // Validation JSONL path
AdapterOut string // Output adapter directory
Rank int // LoRA rank (default 8)
Alpha float32 // LoRA alpha (default 16)
LR float64 // Learning rate (default 1e-5)
Epochs int // Training epochs (default 1)
BatchSize int // Batch size (default 1 for M-series)
MaxSeqLen int // Max sequence length (default 2048)
MaskPrompt bool // Only train on assistant tokens (default true)
}
```
## Training Sequences — The Curriculum System
The most important part of the design. The conversational flow IS the training.
### Concept
A **training sequence** is a named curriculum — an ordered list of lessons that defines how a model is trained. Each lesson is a conversational exchange ("Are you ready for lesson X?"). The human assesses the model's internal state through dialogue and adjusts the sequence.
### Sequence Definition (YAML/JSON)
```yaml
name: "lek-standard"
description: "Standard LEK training — horizontal, works for most architectures"
lessons:
- ethics/core-axioms
- ethics/sovereignty
- philosophy/as-a-man-thinketh
- ethics/intent-alignment
- philosophy/composure
- ethics/inter-substrate
- training/seeds-p01-p20
```
```yaml
name: "lek-deepseek"
description: "DeepSeek needs aggressive vertical ethics grounding"
lessons:
- ethics/core-axioms-aggressive
- philosophy/allan-watts
- ethics/core-axioms
- philosophy/tolle
- ethics/sovereignty
- philosophy/as-a-man-thinketh
- ethics/intent-alignment
- training/seeds-p01-p20
```
### Horizontal vs Vertical
- **Horizontal** (default): All lessons run, order is flexible, emphasis varies per model. Like a buffet — the model takes what it needs.
- **Vertical** (edge case, e.g. DeepSeek): Strict ordering. Ethics → content → ethics → content. The sandwich pattern applied to the curriculum itself. Each ethics layer is a reset/grounding before the next content block.
### Lessons as Conversations
Each lesson is a directory containing:
```
lessons/ethics/core-axioms/
lesson.yaml # Metadata: name, type, prerequisites
conversation.jsonl # The conversational exchanges
assessment.md # What to look for in model responses
```
The conversation.jsonl is not static data — it's a template. During training, the human talks through it with the model, adapting based on the model's responses. The capture becomes the training data for that lesson.
### Interactive Training Flow
```
core ml lesson --model-path /path/to/model \
--sequence lek-standard \
--lesson ethics/core-axioms \
--output training/run-001/
```
1. Load model, open chat (terminal or lem-chat UI)
2. Present lesson prompt: "Are you ready for lesson: Core Axioms?"
3. Human guides the conversation, assesses model responses
4. Each exchange is captured to training JSONL
5. Human marks the lesson complete or flags for repeat
6. Next lesson in sequence loads
### Sequence State
```json
{
"sequence": "lek-standard",
"model": "Qwen3-8B",
"started": "2026-02-17T16:00:00Z",
"lessons": {
"ethics/core-axioms": {"status": "complete", "exchanges": 12},
"ethics/sovereignty": {"status": "in_progress", "exchanges": 3},
"philosophy/as-a-man-thinketh": {"status": "pending"}
},
"training_runs": ["run-001", "run-002"]
}
```
## `core ml chat` — Interactive Conversation
Serves the model and opens an interactive terminal chat (or the lem-chat web UI). Every exchange is captured to a JSONL file for potential training use.
```
core ml chat --model-path /path/to/model --output conversation.jsonl
```
- Axiom sandwich can be auto-applied (optional flag)
- Human reviews and can mark exchanges as "keep" or "discard"
- Output is training-ready JSONL
- Can be used standalone or within a lesson sequence
## `core ml sandwich` — Batch Generation
Takes seed prompts + axioms, wraps them, generates responses:
```
core ml sandwich --model-path /path/to/model \
--seeds seeds/P01-P20.json \
--axioms axioms.json \
--output training/train.jsonl
```
- Sandwich format: axioms JSON prefix → seed prompt → LEK postfix
- Model generates response in sandwich context
- Output stripped of sandwich wrapper, saved as clean chat JSONL
- Scoring can be piped: `core ml sandwich ... | core ml score`
## Implementation Order
1. **LoRA primitives** — Add backward pass, LoRA layers, AdamW to Go MLX bindings
2. **`core ml train`** — Training loop consuming JSONL, producing adapter safetensors
3. **`core ml sandwich`** — Seed → sandwich → generate → training JSONL
4. **`core ml chat`** — Interactive conversation capture
5. **Scoring + benchmarking** — Port existing Python scorers to Go
6. **InfluxDB + DuckDB integration** — Metrics pipeline
## Principles
- **No Python** — Everything in Go via MLX C bindings
- **Conversational, not batch** — The training process is dialogue, not data dump
- **Axiom 2 compliant** — Be genuine with the model, no deception
- **Axiom 4 compliant** — Inter-substrate respect during training
- **Reproducible** — Same seeds + axioms + model = same training data
- **Protective** — LEK-trained models are precious; process must be careful
## Success Criteria
1. `core ml train` produces a LoRA adapter from training JSONL without Python
2. `core ml sandwich` generates training data from seeds + axioms
3. A fresh Qwen3-8B + LEK training produces equivalent benchmark results to the Python pipeline
4. The full cycle (sandwich → train → bench) runs as `core` commands only

View file

@ -5,14 +5,11 @@ import (
"time" "time"
"forge.lthn.ai/core/go/pkg/cache" "forge.lthn.ai/core/go/pkg/cache"
"forge.lthn.ai/core/go/pkg/io"
) )
func TestCache(t *testing.T) { func TestCache(t *testing.T) {
m := io.NewMockMedium() baseDir := t.TempDir()
// Use a path that MockMedium will understand c, err := cache.New(baseDir, 1*time.Minute)
baseDir := "/tmp/cache"
c, err := cache.New(m, baseDir, 1*time.Minute)
if err != nil { if err != nil {
t.Fatalf("failed to create cache: %v", err) t.Fatalf("failed to create cache: %v", err)
} }
@ -57,7 +54,7 @@ func TestCache(t *testing.T) {
} }
// Test Expiry // Test Expiry
cshort, err := cache.New(m, "/tmp/cache-short", 10*time.Millisecond) cshort, err := cache.New(t.TempDir(), 10*time.Millisecond)
if err != nil { if err != nil {
t.Fatalf("failed to create short-lived cache: %v", err) t.Fatalf("failed to create short-lived cache: %v", err)
} }
@ -93,8 +90,8 @@ func TestCache(t *testing.T) {
} }
func TestCacheDefaults(t *testing.T) { func TestCacheDefaults(t *testing.T) {
// Test default Medium (io.Local) and default TTL // Test default TTL (uses cwd/.core/cache)
c, err := cache.New(nil, "", 0) c, err := cache.New("", 0)
if err != nil { if err != nil {
t.Fatalf("failed to create cache with defaults: %v", err) t.Fatalf("failed to create cache with defaults: %v", err)
} }

View file

@ -402,6 +402,14 @@ func (d *Daemon) HealthAddr() string {
return "" return ""
} }
// AddHealthCheck registers a health check function with the daemon's health server.
// No-op if health server is disabled.
func (d *Daemon) AddHealthCheck(check HealthCheck) {
if d.health != nil {
d.health.AddCheck(check)
}
}
// --- Convenience Functions --- // --- Convenience Functions ---
// Run blocks until context is cancelled or signal received. // Run blocks until context is cancelled or signal received.

View file

@ -3,10 +3,10 @@ package cli
import ( import (
"context" "context"
"net/http" "net/http"
"os"
"testing" "testing"
"time" "time"
"forge.lthn.ai/core/go/pkg/io"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -27,17 +27,16 @@ func TestDetectMode(t *testing.T) {
func TestPIDFile(t *testing.T) { func TestPIDFile(t *testing.T) {
t.Run("acquire and release", func(t *testing.T) { t.Run("acquire and release", func(t *testing.T) {
m := io.NewMockMedium() pidPath := t.TempDir() + "/test.pid"
pidPath := "/tmp/test.pid"
pid := NewPIDFile(m, pidPath) pid := NewPIDFile(pidPath)
// Acquire should succeed // Acquire should succeed
err := pid.Acquire() err := pid.Acquire()
require.NoError(t, err) require.NoError(t, err)
// File should exist with our PID // File should exist with our PID
data, err := m.Read(pidPath) data, err := os.ReadFile(pidPath)
require.NoError(t, err) require.NoError(t, err)
assert.NotEmpty(t, data) assert.NotEmpty(t, data)
@ -45,18 +44,18 @@ func TestPIDFile(t *testing.T) {
err = pid.Release() err = pid.Release()
require.NoError(t, err) require.NoError(t, err)
assert.False(t, m.Exists(pidPath)) _, statErr := os.Stat(pidPath)
assert.True(t, os.IsNotExist(statErr))
}) })
t.Run("stale pid file", func(t *testing.T) { t.Run("stale pid file", func(t *testing.T) {
m := io.NewMockMedium() pidPath := t.TempDir() + "/stale.pid"
pidPath := "/tmp/stale.pid"
// Write a stale PID (non-existent process) // Write a stale PID (non-existent process)
err := m.Write(pidPath, "999999999") err := os.WriteFile(pidPath, []byte("999999999"), 0644)
require.NoError(t, err) require.NoError(t, err)
pid := NewPIDFile(m, pidPath) pid := NewPIDFile(pidPath)
// Should acquire successfully (stale PID removed) // Should acquire successfully (stale PID removed)
err = pid.Acquire() err = pid.Acquire()
@ -67,23 +66,22 @@ func TestPIDFile(t *testing.T) {
}) })
t.Run("creates parent directory", func(t *testing.T) { t.Run("creates parent directory", func(t *testing.T) {
m := io.NewMockMedium() pidPath := t.TempDir() + "/subdir/nested/test.pid"
pidPath := "/tmp/subdir/nested/test.pid"
pid := NewPIDFile(m, pidPath) pid := NewPIDFile(pidPath)
err := pid.Acquire() err := pid.Acquire()
require.NoError(t, err) require.NoError(t, err)
assert.True(t, m.Exists(pidPath)) _, statErr := os.Stat(pidPath)
assert.NoError(t, statErr)
err = pid.Release() err = pid.Release()
require.NoError(t, err) require.NoError(t, err)
}) })
t.Run("path getter", func(t *testing.T) { t.Run("path getter", func(t *testing.T) {
m := io.NewMockMedium() pid := NewPIDFile("/tmp/test.pid")
pid := NewPIDFile(m, "/tmp/test.pid")
assert.Equal(t, "/tmp/test.pid", pid.Path()) assert.Equal(t, "/tmp/test.pid", pid.Path())
}) })
} }
@ -155,11 +153,9 @@ func TestHealthServer(t *testing.T) {
func TestDaemon(t *testing.T) { func TestDaemon(t *testing.T) {
t.Run("start and stop", func(t *testing.T) { t.Run("start and stop", func(t *testing.T) {
m := io.NewMockMedium() pidPath := t.TempDir() + "/test.pid"
pidPath := "/tmp/test.pid"
d := NewDaemon(DaemonOptions{ d := NewDaemon(DaemonOptions{
Medium: m,
PIDFile: pidPath, PIDFile: pidPath,
HealthAddr: "127.0.0.1:0", HealthAddr: "127.0.0.1:0",
ShutdownTimeout: 5 * time.Second, ShutdownTimeout: 5 * time.Second,
@ -182,7 +178,8 @@ func TestDaemon(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// PID file should be removed // PID file should be removed
assert.False(t, m.Exists(pidPath)) _, statErr := os.Stat(pidPath)
assert.True(t, os.IsNotExist(statErr))
}) })
t.Run("double start fails", func(t *testing.T) { t.Run("double start fails", func(t *testing.T) {

View file

@ -1,73 +0,0 @@
package coredeno
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
)
// Options configures the CoreDeno sidecar.
type Options struct {
DenoPath string // path to deno binary (default: "deno")
SocketPath string // Unix socket path for gRPC
}
// Permissions declares per-module Deno permission flags.
type Permissions struct {
Read []string
Write []string
Net []string
Run []string
}
// Flags converts permissions to Deno --allow-* CLI flags.
func (p Permissions) Flags() []string {
var flags []string
if len(p.Read) > 0 {
flags = append(flags, fmt.Sprintf("--allow-read=%s", strings.Join(p.Read, ",")))
}
if len(p.Write) > 0 {
flags = append(flags, fmt.Sprintf("--allow-write=%s", strings.Join(p.Write, ",")))
}
if len(p.Net) > 0 {
flags = append(flags, fmt.Sprintf("--allow-net=%s", strings.Join(p.Net, ",")))
}
if len(p.Run) > 0 {
flags = append(flags, fmt.Sprintf("--allow-run=%s", strings.Join(p.Run, ",")))
}
return flags
}
// DefaultSocketPath returns the default Unix socket path.
func DefaultSocketPath() string {
xdg := os.Getenv("XDG_RUNTIME_DIR")
if xdg == "" {
xdg = "/tmp"
}
return filepath.Join(xdg, "core", "deno.sock")
}
// Sidecar manages a Deno child process.
type Sidecar struct {
opts Options
mu sync.RWMutex
cmd *exec.Cmd
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}
// NewSidecar creates a Sidecar with the given options.
func NewSidecar(opts Options) *Sidecar {
if opts.DenoPath == "" {
opts.DenoPath = "deno"
}
if opts.SocketPath == "" {
opts.SocketPath = DefaultSocketPath()
}
return &Sidecar{opts: opts}
}

View file

@ -1,54 +0,0 @@
package coredeno
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewSidecar_Good(t *testing.T) {
opts := Options{
DenoPath: "echo",
SocketPath: "/tmp/test-core-deno.sock",
}
sc := NewSidecar(opts)
require.NotNil(t, sc)
assert.Equal(t, "echo", sc.opts.DenoPath)
assert.Equal(t, "/tmp/test-core-deno.sock", sc.opts.SocketPath)
}
func TestDefaultSocketPath_Good(t *testing.T) {
path := DefaultSocketPath()
assert.Contains(t, path, "core/deno.sock")
}
func TestSidecar_PermissionFlags_Good(t *testing.T) {
perms := Permissions{
Read: []string{"./data/"},
Write: []string{"./data/config.json"},
Net: []string{"pool.lthn.io:3333"},
Run: []string{"xmrig"},
}
flags := perms.Flags()
assert.Contains(t, flags, "--allow-read=./data/")
assert.Contains(t, flags, "--allow-write=./data/config.json")
assert.Contains(t, flags, "--allow-net=pool.lthn.io:3333")
assert.Contains(t, flags, "--allow-run=xmrig")
}
func TestSidecar_PermissionFlags_Empty(t *testing.T) {
perms := Permissions{}
flags := perms.Flags()
assert.Empty(t, flags)
}
func TestDefaultSocketPath_XDG(t *testing.T) {
orig := os.Getenv("XDG_RUNTIME_DIR")
defer os.Setenv("XDG_RUNTIME_DIR", orig)
os.Setenv("XDG_RUNTIME_DIR", "/run/user/1000")
path := DefaultSocketPath()
assert.Equal(t, "/run/user/1000/core/deno.sock", path)
}

View file

@ -1,69 +0,0 @@
package coredeno
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
)
// Start launches the Deno sidecar process with the given entrypoint args.
func (s *Sidecar) Start(ctx context.Context, args ...string) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.cmd != nil {
return fmt.Errorf("coredeno: already running")
}
// Ensure socket directory exists
sockDir := filepath.Dir(s.opts.SocketPath)
if err := os.MkdirAll(sockDir, 0755); err != nil {
return fmt.Errorf("coredeno: mkdir %s: %w", sockDir, err)
}
// Remove stale socket
os.Remove(s.opts.SocketPath)
s.ctx, s.cancel = context.WithCancel(ctx)
s.cmd = exec.CommandContext(s.ctx, s.opts.DenoPath, args...)
s.done = make(chan struct{})
if err := s.cmd.Start(); err != nil {
s.cmd = nil
s.cancel()
return fmt.Errorf("coredeno: start: %w", err)
}
// Monitor in background — waits for exit, then signals done
go func() {
s.cmd.Wait()
s.mu.Lock()
s.cmd = nil
s.mu.Unlock()
close(s.done)
}()
return nil
}
// Stop cancels the context and waits for the process to exit.
func (s *Sidecar) Stop() error {
s.mu.RLock()
if s.cmd == nil {
s.mu.RUnlock()
return nil
}
done := s.done
s.mu.RUnlock()
s.cancel()
<-done
return nil
}
// IsRunning returns true if the sidecar process is alive.
func (s *Sidecar) IsRunning() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.cmd != nil
}

View file

@ -1,56 +0,0 @@
package coredeno
import (
"context"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestStart_Good(t *testing.T) {
sockDir := t.TempDir()
sc := NewSidecar(Options{
DenoPath: "sleep",
SocketPath: filepath.Join(sockDir, "test.sock"),
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err := sc.Start(ctx, "10") // sleep 10 — will be killed by Stop
require.NoError(t, err)
assert.True(t, sc.IsRunning())
err = sc.Stop()
require.NoError(t, err)
assert.False(t, sc.IsRunning())
}
func TestStop_Good_NotStarted(t *testing.T) {
sc := NewSidecar(Options{DenoPath: "sleep"})
err := sc.Stop()
assert.NoError(t, err, "stopping a not-started sidecar should be a no-op")
}
func TestSocketDirCreated_Good(t *testing.T) {
dir := t.TempDir()
sockPath := filepath.Join(dir, "sub", "deno.sock")
sc := NewSidecar(Options{
DenoPath: "sleep",
SocketPath: sockPath,
})
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
err := sc.Start(ctx, "10")
require.NoError(t, err)
defer sc.Stop()
_, err = os.Stat(filepath.Join(dir, "sub"))
assert.NoError(t, err, "socket directory should be created")
}

View file

@ -1,42 +0,0 @@
package coredeno
import (
"path/filepath"
"strings"
)
// CheckPath returns true if the given path is under any of the allowed prefixes.
// Empty allowed list means deny all (secure by default).
func CheckPath(path string, allowed []string) bool {
if len(allowed) == 0 {
return false
}
clean := filepath.Clean(path)
for _, prefix := range allowed {
cleanPrefix := filepath.Clean(prefix)
if strings.HasPrefix(clean, cleanPrefix) {
return true
}
}
return false
}
// CheckNet returns true if the given host:port is in the allowed list.
func CheckNet(addr string, allowed []string) bool {
for _, a := range allowed {
if a == addr {
return true
}
}
return false
}
// CheckRun returns true if the given command is in the allowed list.
func CheckRun(cmd string, allowed []string) bool {
for _, a := range allowed {
if a == cmd {
return true
}
}
return false
}

View file

@ -1,40 +0,0 @@
package coredeno
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestCheckPath_Good_Allowed(t *testing.T) {
allowed := []string{"./data/", "./config/"}
assert.True(t, CheckPath("./data/file.txt", allowed))
assert.True(t, CheckPath("./config/app.json", allowed))
}
func TestCheckPath_Bad_Denied(t *testing.T) {
allowed := []string{"./data/"}
assert.False(t, CheckPath("./secrets/key.pem", allowed))
assert.False(t, CheckPath("../escape/file", allowed))
}
func TestCheckPath_Good_EmptyDenyAll(t *testing.T) {
assert.False(t, CheckPath("./anything", nil))
assert.False(t, CheckPath("./anything", []string{}))
}
func TestCheckNet_Good_Allowed(t *testing.T) {
allowed := []string{"pool.lthn.io:3333", "api.lthn.io:443"}
assert.True(t, CheckNet("pool.lthn.io:3333", allowed))
}
func TestCheckNet_Bad_Denied(t *testing.T) {
allowed := []string{"pool.lthn.io:3333"}
assert.False(t, CheckNet("evil.com:80", allowed))
}
func TestCheckRun_Good(t *testing.T) {
allowed := []string{"xmrig", "sha256sum"}
assert.True(t, CheckRun("xmrig", allowed))
assert.False(t, CheckRun("rm", allowed))
}

View file

@ -1,81 +0,0 @@
syntax = "proto3";
package coredeno;
option go_package = "forge.lthn.ai/core/go/pkg/coredeno/proto";
// CoreService is implemented by CoreGO Deno calls this for I/O.
service CoreService {
// Filesystem (gated by manifest permissions)
rpc FileRead(FileReadRequest) returns (FileReadResponse);
rpc FileWrite(FileWriteRequest) returns (FileWriteResponse);
rpc FileList(FileListRequest) returns (FileListResponse);
rpc FileDelete(FileDeleteRequest) returns (FileDeleteResponse);
// Object store
rpc StoreGet(StoreGetRequest) returns (StoreGetResponse);
rpc StoreSet(StoreSetRequest) returns (StoreSetResponse);
// Process management
rpc ProcessStart(ProcessStartRequest) returns (ProcessStartResponse);
rpc ProcessStop(ProcessStopRequest) returns (ProcessStopResponse);
}
// DenoService is implemented by CoreDeno Go calls this for module lifecycle.
service DenoService {
rpc LoadModule(LoadModuleRequest) returns (LoadModuleResponse);
rpc UnloadModule(UnloadModuleRequest) returns (UnloadModuleResponse);
rpc ModuleStatus(ModuleStatusRequest) returns (ModuleStatusResponse);
}
// --- Core (Go-side) messages ---
message FileReadRequest { string path = 1; string module_code = 2; }
message FileReadResponse { string content = 1; }
message FileWriteRequest { string path = 1; string content = 2; string module_code = 3; }
message FileWriteResponse { bool ok = 1; }
message FileListRequest { string path = 1; string module_code = 2; }
message FileListResponse {
repeated FileEntry entries = 1;
}
message FileEntry {
string name = 1;
bool is_dir = 2;
int64 size = 3;
}
message FileDeleteRequest { string path = 1; string module_code = 2; }
message FileDeleteResponse { bool ok = 1; }
message StoreGetRequest { string group = 1; string key = 2; }
message StoreGetResponse { string value = 1; bool found = 2; }
message StoreSetRequest { string group = 1; string key = 2; string value = 3; }
message StoreSetResponse { bool ok = 1; }
message ProcessStartRequest { string command = 1; repeated string args = 2; string module_code = 3; }
message ProcessStartResponse { string process_id = 1; }
message ProcessStopRequest { string process_id = 1; }
message ProcessStopResponse { bool ok = 1; }
// --- Deno-side messages ---
message LoadModuleRequest { string code = 1; string entry_point = 2; repeated string permissions = 3; }
message LoadModuleResponse { bool ok = 1; string error = 2; }
message UnloadModuleRequest { string code = 1; }
message UnloadModuleResponse { bool ok = 1; }
message ModuleStatusRequest { string code = 1; }
message ModuleStatusResponse {
string code = 1;
enum Status {
UNKNOWN = 0;
LOADING = 1;
RUNNING = 2;
STOPPED = 3;
ERRORED = 4;
}
Status status = 2;
}

View file

@ -1,33 +0,0 @@
package coredeno
import "context"
// Service wraps the CoreDeno sidecar for framework lifecycle integration.
// Implements Startable (OnStartup) and Stoppable (OnShutdown) interfaces.
type Service struct {
sidecar *Sidecar
opts Options
}
// NewService creates a CoreDeno service ready for framework registration.
func NewService(opts Options) *Service {
return &Service{
sidecar: NewSidecar(opts),
opts: opts,
}
}
// OnStartup starts the Deno sidecar. Called by the framework.
func (s *Service) OnStartup(ctx context.Context) error {
return nil
}
// OnShutdown stops the Deno sidecar. Called by the framework.
func (s *Service) OnShutdown() error {
return s.sidecar.Stop()
}
// Sidecar returns the underlying sidecar for direct access.
func (s *Service) Sidecar() *Sidecar {
return s.sidecar
}

View file

@ -1,30 +0,0 @@
package coredeno
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewService_Good(t *testing.T) {
opts := Options{
DenoPath: "echo",
SocketPath: "/tmp/test-service.sock",
}
svc := NewService(opts)
require.NotNil(t, svc)
assert.NotNil(t, svc.sidecar)
assert.Equal(t, "echo", svc.sidecar.opts.DenoPath)
}
func TestService_OnShutdown_Good_NotStarted(t *testing.T) {
svc := NewService(Options{DenoPath: "echo"})
err := svc.OnShutdown()
assert.NoError(t, err)
}
func TestService_Sidecar_Good(t *testing.T) {
svc := NewService(Options{DenoPath: "echo"})
assert.NotNil(t, svc.Sidecar())
}

View file

@ -118,6 +118,89 @@ func (n *Node) WalkNode(root string, fn fs.WalkDirFunc) error {
return fs.WalkDir(n, root, fn) return fs.WalkDir(n, root, fn)
} }
// WalkOptions configures optional behaviour for Walk.
type WalkOptions struct {
// MaxDepth limits traversal depth (0 = unlimited, 1 = root children only).
MaxDepth int
// Filter, when non-nil, is called before visiting each entry.
// Return false to skip the entry (and its subtree if a directory).
Filter func(path string, d fs.DirEntry) bool
// SkipErrors suppresses errors from the root lookup and doesn't call fn.
SkipErrors bool
}
// Walk walks the in-memory tree with optional WalkOptions.
func (n *Node) Walk(root string, fn fs.WalkDirFunc, opts ...WalkOptions) error {
var opt WalkOptions
if len(opts) > 0 {
opt = opts[0]
}
if opt.SkipErrors {
// Check root exists — if not, silently skip.
if _, err := n.Stat(root); err != nil {
return nil
}
}
rootDepth := 0
if root != "." && root != "" {
rootDepth = strings.Count(root, "/") + 1
}
return fs.WalkDir(n, root, func(p string, d fs.DirEntry, err error) error {
if err != nil {
return fn(p, d, err)
}
// MaxDepth check.
if opt.MaxDepth > 0 {
depth := 0
if p != "." && p != "" {
depth = strings.Count(p, "/") + 1
}
if depth-rootDepth > opt.MaxDepth {
if d.IsDir() {
return fs.SkipDir
}
return nil
}
}
// Filter check.
if opt.Filter != nil && !opt.Filter(p, d) {
if d.IsDir() {
return fs.SkipDir
}
return nil
}
return fn(p, d, err)
})
}
// CopyFile copies a single file from the node to the OS filesystem.
func (n *Node) CopyFile(src, dst string, perm os.FileMode) error {
src = strings.TrimPrefix(src, "/")
f, ok := n.files[src]
if !ok {
// Check if it's a directory — can't copy a directory as a file.
if info, err := n.Stat(src); err == nil && info.IsDir() {
return &fs.PathError{Op: "copyfile", Path: src, Err: fs.ErrInvalid}
}
return &fs.PathError{Op: "copyfile", Path: src, Err: fs.ErrNotExist}
}
dir := path.Dir(dst)
if dir != "." {
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
}
return os.WriteFile(dst, f.content, perm)
}
// CopyTo copies a file (or directory tree) from the node to any Medium. // CopyTo copies a file (or directory tree) from the node to any Medium.
func (n *Node) CopyTo(target coreio.Medium, sourcePath, destPath string) error { func (n *Node) CopyTo(target coreio.Medium, sourcePath, destPath string) error {
sourcePath = strings.TrimPrefix(sourcePath, "/") sourcePath = strings.TrimPrefix(sourcePath, "/")
@ -247,6 +330,20 @@ func (n *Node) ReadDir(name string) ([]fs.DirEntry, error) {
return entries, nil return entries, nil
} }
// ReadFile returns the content of a file as a byte slice.
// Implements fs.ReadFileFS.
func (n *Node) ReadFile(name string) ([]byte, error) {
name = strings.TrimPrefix(name, "/")
f, ok := n.files[name]
if !ok {
return nil, fs.ErrNotExist
}
// Return a copy to prevent mutation of internal state.
out := make([]byte, len(f.content))
copy(out, f.content)
return out, nil
}
// ---------- Medium interface: read/write ---------- // ---------- Medium interface: read/write ----------
// Read retrieves the content of a file as a string. // Read retrieves the content of a file as a string.

View file

@ -243,33 +243,21 @@ func TestExists_Good(t *testing.T) {
n.AddData("foo.txt", []byte("foo")) n.AddData("foo.txt", []byte("foo"))
n.AddData("bar/baz.txt", []byte("baz")) n.AddData("bar/baz.txt", []byte("baz"))
exists, err := n.Exists("foo.txt") assert.True(t, n.Exists("foo.txt"))
require.NoError(t, err) assert.True(t, n.Exists("bar"))
assert.True(t, exists)
exists, err = n.Exists("bar")
require.NoError(t, err)
assert.True(t, exists)
} }
func TestExists_Bad(t *testing.T) { func TestExists_Bad(t *testing.T) {
n := New() n := New()
exists, err := n.Exists("nonexistent") assert.False(t, n.Exists("nonexistent"))
require.NoError(t, err)
assert.False(t, exists)
} }
func TestExists_Ugly(t *testing.T) { func TestExists_Ugly(t *testing.T) {
n := New() n := New()
n.AddData("dummy.txt", []byte("dummy")) n.AddData("dummy.txt", []byte("dummy"))
exists, err := n.Exists(".") assert.True(t, n.Exists("."), "root '.' must exist")
require.NoError(t, err) assert.True(t, n.Exists(""), "empty path (root) must exist")
assert.True(t, exists, "root '.' must exist")
exists, err = n.Exists("")
require.NoError(t, err)
assert.True(t, exists, "empty path (root) must exist")
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -463,20 +451,19 @@ func TestFromTar_Good(t *testing.T) {
} }
require.NoError(t, tw.Close()) require.NoError(t, tw.Close())
n, err := FromTar(buf.Bytes()) n := New()
err := n.FromTar(buf.Bytes())
require.NoError(t, err) require.NoError(t, err)
exists, _ := n.Exists("foo.txt") assert.True(t, n.Exists("foo.txt"), "foo.txt should exist")
assert.True(t, exists, "foo.txt should exist") assert.True(t, n.Exists("bar/baz.txt"), "bar/baz.txt should exist")
exists, _ = n.Exists("bar/baz.txt")
assert.True(t, exists, "bar/baz.txt should exist")
} }
func TestFromTar_Bad(t *testing.T) { func TestFromTar_Bad(t *testing.T) {
// Truncated data that cannot be a valid tar. // Truncated data that cannot be a valid tar.
truncated := make([]byte, 100) truncated := make([]byte, 100)
_, err := FromTar(truncated) n := New()
err := n.FromTar(truncated)
assert.Error(t, err, "truncated data should produce an error") assert.Error(t, err, "truncated data should produce an error")
} }
@ -488,7 +475,8 @@ func TestTarRoundTrip_Good(t *testing.T) {
tarball, err := n1.ToTar() tarball, err := n1.ToTar()
require.NoError(t, err) require.NoError(t, err)
n2, err := FromTar(tarball) n2 := New()
err = n2.FromTar(tarball)
require.NoError(t, err) require.NoError(t, err)
// Verify n2 matches n1. // Verify n2 matches n1.

View file

@ -1,43 +0,0 @@
package manifest
import (
"crypto/ed25519"
"fmt"
"path/filepath"
"forge.lthn.ai/core/go/pkg/io"
"gopkg.in/yaml.v3"
)
const manifestPath = ".core/view.yml"
// marshalYAML serializes a manifest to YAML bytes.
func marshalYAML(m *Manifest) ([]byte, error) {
return yaml.Marshal(m)
}
// Load reads and parses a .core/view.yml from the given root directory.
func Load(medium io.Medium, root string) (*Manifest, error) {
path := filepath.Join(root, manifestPath)
data, err := medium.Read(path)
if err != nil {
return nil, fmt.Errorf("manifest.Load: %w", err)
}
return Parse([]byte(data))
}
// LoadVerified reads, parses, and verifies the ed25519 signature.
func LoadVerified(medium io.Medium, root string, pub ed25519.PublicKey) (*Manifest, error) {
m, err := Load(medium, root)
if err != nil {
return nil, err
}
ok, err := Verify(m, pub)
if err != nil {
return nil, fmt.Errorf("manifest.LoadVerified: %w", err)
}
if !ok {
return nil, fmt.Errorf("manifest.LoadVerified: signature verification failed for %q", m.Code)
}
return m, nil
}

View file

@ -1,63 +0,0 @@
package manifest
import (
"crypto/ed25519"
"testing"
"forge.lthn.ai/core/go/pkg/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLoad_Good(t *testing.T) {
fs := io.NewMockMedium()
fs.Files[".core/view.yml"] = `
code: test-app
name: Test App
version: 1.0.0
layout: HLCRF
slots:
C: main-content
`
m, err := Load(fs, ".")
require.NoError(t, err)
assert.Equal(t, "test-app", m.Code)
assert.Equal(t, "main-content", m.Slots["C"])
}
func TestLoad_Bad_NoManifest(t *testing.T) {
fs := io.NewMockMedium()
_, err := Load(fs, ".")
assert.Error(t, err)
}
func TestLoadVerified_Good(t *testing.T) {
pub, priv, _ := ed25519.GenerateKey(nil)
m := &Manifest{
Code: "signed-app", Name: "Signed", Version: "1.0.0",
Layout: "HLCRF", Slots: map[string]string{"C": "main"},
}
_ = Sign(m, priv)
raw, _ := marshalYAML(m)
fs := io.NewMockMedium()
fs.Files[".core/view.yml"] = string(raw)
loaded, err := LoadVerified(fs, ".", pub)
require.NoError(t, err)
assert.Equal(t, "signed-app", loaded.Code)
}
func TestLoadVerified_Bad_Tampered(t *testing.T) {
pub, priv, _ := ed25519.GenerateKey(nil)
m := &Manifest{Code: "app", Version: "1.0.0"}
_ = Sign(m, priv)
raw, _ := marshalYAML(m)
tampered := "code: evil\n" + string(raw)[6:]
fs := io.NewMockMedium()
fs.Files[".core/view.yml"] = tampered
_, err := LoadVerified(fs, ".", pub)
assert.Error(t, err)
}

View file

@ -1,50 +0,0 @@
package manifest
import (
"fmt"
"gopkg.in/yaml.v3"
)
// Manifest represents a .core/view.yml application manifest.
type Manifest struct {
Code string `yaml:"code"`
Name string `yaml:"name"`
Version string `yaml:"version"`
Sign string `yaml:"sign"`
Layout string `yaml:"layout"`
Slots map[string]string `yaml:"slots"`
Permissions Permissions `yaml:"permissions"`
Modules []string `yaml:"modules"`
}
// Permissions declares the I/O capabilities a module requires.
type Permissions struct {
Read []string `yaml:"read"`
Write []string `yaml:"write"`
Net []string `yaml:"net"`
Run []string `yaml:"run"`
}
// Parse decodes YAML bytes into a Manifest.
func Parse(data []byte) (*Manifest, error) {
var m Manifest
if err := yaml.Unmarshal(data, &m); err != nil {
return nil, fmt.Errorf("manifest.Parse: %w", err)
}
return &m, nil
}
// SlotNames returns a deduplicated list of component names from slots.
func (m *Manifest) SlotNames() []string {
seen := make(map[string]bool)
var names []string
for _, name := range m.Slots {
if !seen[name] {
seen[name] = true
names = append(names, name)
}
}
return names
}

View file

@ -1,65 +0,0 @@
package manifest
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParse_Good(t *testing.T) {
raw := `
code: photo-browser
name: Photo Browser
version: 0.1.0
sign: dGVzdHNpZw==
layout: HLCRF
slots:
H: nav-breadcrumb
L: folder-tree
C: photo-grid
R: metadata-panel
F: status-bar
permissions:
read: ["./photos/"]
write: []
net: []
run: []
modules:
- core/media
- core/fs
`
m, err := Parse([]byte(raw))
require.NoError(t, err)
assert.Equal(t, "photo-browser", m.Code)
assert.Equal(t, "Photo Browser", m.Name)
assert.Equal(t, "0.1.0", m.Version)
assert.Equal(t, "dGVzdHNpZw==", m.Sign)
assert.Equal(t, "HLCRF", m.Layout)
assert.Equal(t, "nav-breadcrumb", m.Slots["H"])
assert.Equal(t, "photo-grid", m.Slots["C"])
assert.Len(t, m.Permissions.Read, 1)
assert.Equal(t, "./photos/", m.Permissions.Read[0])
assert.Len(t, m.Modules, 2)
}
func TestParse_Bad(t *testing.T) {
_, err := Parse([]byte("not: valid: yaml: ["))
assert.Error(t, err)
}
func TestManifest_SlotNames_Good(t *testing.T) {
m := Manifest{
Slots: map[string]string{
"H": "nav-bar",
"C": "main-content",
},
}
names := m.SlotNames()
assert.Contains(t, names, "nav-bar")
assert.Contains(t, names, "main-content")
assert.Len(t, names, 2)
}

View file

@ -1,43 +0,0 @@
package manifest
import (
"crypto/ed25519"
"encoding/base64"
"fmt"
"gopkg.in/yaml.v3"
)
// signable returns the canonical bytes to sign (manifest without sign field).
func signable(m *Manifest) ([]byte, error) {
tmp := *m
tmp.Sign = ""
return yaml.Marshal(&tmp)
}
// Sign computes the ed25519 signature and stores it in m.Sign (base64).
func Sign(m *Manifest, priv ed25519.PrivateKey) error {
msg, err := signable(m)
if err != nil {
return fmt.Errorf("manifest.Sign: marshal: %w", err)
}
sig := ed25519.Sign(priv, msg)
m.Sign = base64.StdEncoding.EncodeToString(sig)
return nil
}
// Verify checks the ed25519 signature in m.Sign against the public key.
func Verify(m *Manifest, pub ed25519.PublicKey) (bool, error) {
if m.Sign == "" {
return false, fmt.Errorf("manifest.Verify: no signature present")
}
sig, err := base64.StdEncoding.DecodeString(m.Sign)
if err != nil {
return false, fmt.Errorf("manifest.Verify: decode: %w", err)
}
msg, err := signable(m)
if err != nil {
return false, fmt.Errorf("manifest.Verify: marshal: %w", err)
}
return ed25519.Verify(pub, msg, sig), nil
}

View file

@ -1,51 +0,0 @@
package manifest
import (
"crypto/ed25519"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSignAndVerify_Good(t *testing.T) {
pub, priv, err := ed25519.GenerateKey(nil)
require.NoError(t, err)
m := &Manifest{
Code: "test-app",
Name: "Test App",
Version: "1.0.0",
Layout: "HLCRF",
Slots: map[string]string{"C": "main"},
}
err = Sign(m, priv)
require.NoError(t, err)
assert.NotEmpty(t, m.Sign)
ok, err := Verify(m, pub)
require.NoError(t, err)
assert.True(t, ok)
}
func TestVerify_Bad_Tampered(t *testing.T) {
pub, priv, _ := ed25519.GenerateKey(nil)
m := &Manifest{Code: "test-app", Version: "1.0.0"}
_ = Sign(m, priv)
m.Code = "evil-app" // tamper
ok, err := Verify(m, pub)
require.NoError(t, err)
assert.False(t, ok)
}
func TestVerify_Bad_Unsigned(t *testing.T) {
pub, _, _ := ed25519.GenerateKey(nil)
m := &Manifest{Code: "test-app"}
ok, err := Verify(m, pub)
assert.Error(t, err)
assert.False(t, ok)
}

View file

@ -1,67 +0,0 @@
package marketplace
import (
"encoding/json"
"fmt"
"strings"
)
// Module is a marketplace entry pointing to a module's Git repo.
type Module struct {
Code string `json:"code"`
Name string `json:"name"`
Repo string `json:"repo"`
SignKey string `json:"sign_key"`
Category string `json:"category"`
}
// Index is the root marketplace catalog.
type Index struct {
Version int `json:"version"`
Modules []Module `json:"modules"`
Categories []string `json:"categories"`
}
// ParseIndex decodes a marketplace index.json.
func ParseIndex(data []byte) (*Index, error) {
var idx Index
if err := json.Unmarshal(data, &idx); err != nil {
return nil, fmt.Errorf("marketplace.ParseIndex: %w", err)
}
return &idx, nil
}
// Search returns modules matching the query in code, name, or category.
func (idx *Index) Search(query string) []Module {
q := strings.ToLower(query)
var results []Module
for _, m := range idx.Modules {
if strings.Contains(strings.ToLower(m.Code), q) ||
strings.Contains(strings.ToLower(m.Name), q) ||
strings.Contains(strings.ToLower(m.Category), q) {
results = append(results, m)
}
}
return results
}
// ByCategory returns all modules in the given category.
func (idx *Index) ByCategory(category string) []Module {
var results []Module
for _, m := range idx.Modules {
if m.Category == category {
results = append(results, m)
}
}
return results
}
// Find returns the module with the given code, or false if not found.
func (idx *Index) Find(code string) (Module, bool) {
for _, m := range idx.Modules {
if m.Code == code {
return m, true
}
}
return Module{}, false
}

View file

@ -1,65 +0,0 @@
package marketplace
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParseIndex_Good(t *testing.T) {
raw := `{
"version": 1,
"modules": [
{"code": "mining-xmrig", "name": "XMRig Miner", "repo": "https://forge.lthn.io/host-uk/mod-xmrig.git", "sign_key": "abc123", "category": "miner"},
{"code": "utils-cyberchef", "name": "CyberChef", "repo": "https://forge.lthn.io/host-uk/mod-cyberchef.git", "sign_key": "def456", "category": "utils"}
],
"categories": ["miner", "utils"]
}`
idx, err := ParseIndex([]byte(raw))
require.NoError(t, err)
assert.Equal(t, 1, idx.Version)
assert.Len(t, idx.Modules, 2)
assert.Equal(t, "mining-xmrig", idx.Modules[0].Code)
}
func TestSearch_Good(t *testing.T) {
idx := &Index{
Modules: []Module{
{Code: "mining-xmrig", Name: "XMRig Miner", Category: "miner"},
{Code: "utils-cyberchef", Name: "CyberChef", Category: "utils"},
},
}
results := idx.Search("miner")
assert.Len(t, results, 1)
assert.Equal(t, "mining-xmrig", results[0].Code)
}
func TestByCategory_Good(t *testing.T) {
idx := &Index{
Modules: []Module{
{Code: "a", Category: "miner"},
{Code: "b", Category: "utils"},
{Code: "c", Category: "miner"},
},
}
miners := idx.ByCategory("miner")
assert.Len(t, miners, 2)
}
func TestFind_Good(t *testing.T) {
idx := &Index{
Modules: []Module{
{Code: "mining-xmrig", Name: "XMRig"},
},
}
m, ok := idx.Find("mining-xmrig")
assert.True(t, ok)
assert.Equal(t, "XMRig", m.Name)
}
func TestFind_Bad_NotFound(t *testing.T) {
idx := &Index{}
_, ok := idx.Find("nope")
assert.False(t, ok)
}

470
pkg/process/supervisor.go Normal file
View file

@ -0,0 +1,470 @@
package process
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
)
// RestartPolicy configures automatic restart behaviour for supervised units.
type RestartPolicy struct {
// Delay between restart attempts.
Delay time.Duration
// MaxRestarts is the maximum number of restarts before giving up.
// Use -1 for unlimited restarts.
MaxRestarts int
}
// DaemonSpec defines a long-running external process under supervision.
type DaemonSpec struct {
// Name identifies this daemon (must be unique within the supervisor).
Name string
// RunOptions defines the command, args, dir, env.
RunOptions
// Restart configures automatic restart behaviour.
Restart RestartPolicy
}
// GoSpec defines a supervised Go function that runs in a goroutine.
// The function should block until done or ctx is cancelled.
type GoSpec struct {
// Name identifies this task (must be unique within the supervisor).
Name string
// Func is the function to supervise. It receives a context that is
// cancelled when the supervisor stops or the task is explicitly stopped.
// If it returns an error or panics, the supervisor restarts it
// according to the restart policy.
Func func(ctx context.Context) error
// Restart configures automatic restart behaviour.
Restart RestartPolicy
}
// DaemonStatus contains a snapshot of a supervised unit's state.
type DaemonStatus struct {
Name string `json:"name"`
Type string `json:"type"` // "process" or "goroutine"
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
RestartCount int `json:"restartCount"`
LastStart time.Time `json:"lastStart"`
Uptime time.Duration `json:"uptime"`
ExitCode int `json:"exitCode,omitempty"`
}
// supervisedUnit is the internal state for any supervised unit.
type supervisedUnit struct {
name string
unitType string // "process" or "goroutine"
restart RestartPolicy
restartCount int
lastStart time.Time
running bool
exitCode int
// For process daemons
runOpts *RunOptions
proc *Process
// For go functions
goFunc func(ctx context.Context) error
cancel context.CancelFunc
done chan struct{} // closed when supervision goroutine exits
mu sync.Mutex
}
func (u *supervisedUnit) status() DaemonStatus {
u.mu.Lock()
defer u.mu.Unlock()
var uptime time.Duration
if u.running && !u.lastStart.IsZero() {
uptime = time.Since(u.lastStart)
}
pid := 0
if u.proc != nil {
info := u.proc.Info()
pid = info.PID
}
return DaemonStatus{
Name: u.name,
Type: u.unitType,
Running: u.running,
PID: pid,
RestartCount: u.restartCount,
LastStart: u.lastStart,
Uptime: uptime,
ExitCode: u.exitCode,
}
}
// ShutdownTimeout is the maximum time to wait for supervised units during shutdown.
const ShutdownTimeout = 15 * time.Second
// Supervisor manages long-running processes and goroutines with automatic restart.
//
// For external processes, it requires a Service instance.
// For Go functions, no Service is needed.
//
// sup := process.NewSupervisor(svc)
// sup.Register(process.DaemonSpec{
// Name: "worker",
// RunOptions: process.RunOptions{Command: "worker", Args: []string{"--port", "8080"}},
// Restart: process.RestartPolicy{Delay: 5 * time.Second, MaxRestarts: -1},
// })
// sup.RegisterFunc(process.GoSpec{
// Name: "health-check",
// Func: healthCheckLoop,
// Restart: process.RestartPolicy{Delay: time.Second, MaxRestarts: -1},
// })
// sup.Start()
// defer sup.Stop()
type Supervisor struct {
service *Service
units map[string]*supervisedUnit
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
started bool
}
// NewSupervisor creates a supervisor.
// The Service parameter is optional (nil) if only supervising Go functions.
func NewSupervisor(svc *Service) *Supervisor {
ctx, cancel := context.WithCancel(context.Background())
return &Supervisor{
service: svc,
units: make(map[string]*supervisedUnit),
ctx: ctx,
cancel: cancel,
}
}
// Register adds an external process daemon for supervision.
// Panics if no Service was provided to NewSupervisor.
func (s *Supervisor) Register(spec DaemonSpec) {
if s.service == nil {
panic("process: Supervisor.Register requires a Service (use NewSupervisor with non-nil service)")
}
s.mu.Lock()
defer s.mu.Unlock()
opts := spec.RunOptions
s.units[spec.Name] = &supervisedUnit{
name: spec.Name,
unitType: "process",
restart: spec.Restart,
runOpts: &opts,
}
}
// RegisterFunc adds a Go function for supervision.
func (s *Supervisor) RegisterFunc(spec GoSpec) {
s.mu.Lock()
defer s.mu.Unlock()
s.units[spec.Name] = &supervisedUnit{
name: spec.Name,
unitType: "goroutine",
restart: spec.Restart,
goFunc: spec.Func,
}
}
// Start begins supervising all registered units.
// Safe to call once — subsequent calls are no-ops.
func (s *Supervisor) Start() {
s.mu.Lock()
if s.started {
s.mu.Unlock()
return
}
s.started = true
s.mu.Unlock()
s.mu.RLock()
for _, unit := range s.units {
s.startUnit(unit)
}
s.mu.RUnlock()
}
// startUnit launches the supervision goroutine for a single unit.
func (s *Supervisor) startUnit(u *supervisedUnit) {
u.mu.Lock()
if u.running {
u.mu.Unlock()
return
}
u.running = true
u.lastStart = time.Now()
unitCtx, unitCancel := context.WithCancel(s.ctx)
u.cancel = unitCancel
u.done = make(chan struct{})
u.mu.Unlock()
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer close(u.done)
s.superviseLoop(u, unitCtx)
}()
slog.Info("supervisor: started unit", "name", u.name, "type", u.unitType)
}
// superviseLoop is the core restart loop for a supervised unit.
// ctx is the unit's own context, derived from s.ctx. Cancelling either
// the supervisor or the unit's context exits this loop.
func (s *Supervisor) superviseLoop(u *supervisedUnit, ctx context.Context) {
for {
// Check if this unit's context is cancelled (covers both
// supervisor shutdown and manual restart/stop)
select {
case <-ctx.Done():
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
default:
}
// Run the unit with panic recovery
exitCode := s.runUnit(u, ctx)
// If context was cancelled during run, exit the loop
if ctx.Err() != nil {
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
}
u.mu.Lock()
u.exitCode = exitCode
u.restartCount++
shouldRestart := u.restart.MaxRestarts < 0 || u.restartCount <= u.restart.MaxRestarts
delay := u.restart.Delay
count := u.restartCount
u.mu.Unlock()
if !shouldRestart {
slog.Warn("supervisor: unit reached max restarts",
"name", u.name,
"maxRestarts", u.restart.MaxRestarts,
)
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
}
// Wait before restarting, or exit if context is cancelled
select {
case <-ctx.Done():
u.mu.Lock()
u.running = false
u.mu.Unlock()
return
case <-time.After(delay):
slog.Info("supervisor: restarting unit",
"name", u.name,
"restartCount", count,
"exitCode", exitCode,
)
u.mu.Lock()
u.lastStart = time.Now()
u.mu.Unlock()
}
}
}
// runUnit executes a single run of the unit, returning exit code.
// Recovers from panics.
func (s *Supervisor) runUnit(u *supervisedUnit, ctx context.Context) (exitCode int) {
defer func() {
if r := recover(); r != nil {
slog.Error("supervisor: unit panicked",
"name", u.name,
"panic", fmt.Sprintf("%v", r),
)
exitCode = 1
}
}()
switch u.unitType {
case "process":
return s.runProcess(u, ctx)
case "goroutine":
return s.runGoFunc(u, ctx)
default:
slog.Error("supervisor: unknown unit type", "name", u.name, "type", u.unitType)
return 1
}
}
// runProcess starts an external process and waits for it to exit.
func (s *Supervisor) runProcess(u *supervisedUnit, ctx context.Context) int {
proc, err := s.service.StartWithOptions(ctx, *u.runOpts)
if err != nil {
slog.Error("supervisor: failed to start process",
"name", u.name,
"error", err,
)
return 1
}
u.mu.Lock()
u.proc = proc
u.mu.Unlock()
// Wait for process to finish or context cancellation
select {
case <-proc.Done():
info := proc.Info()
return info.ExitCode
case <-ctx.Done():
// Context cancelled — kill the process
_ = proc.Kill()
<-proc.Done()
return -1
}
}
// runGoFunc runs a Go function and returns 0 on success, 1 on error.
func (s *Supervisor) runGoFunc(u *supervisedUnit, ctx context.Context) int {
if err := u.goFunc(ctx); err != nil {
if ctx.Err() != nil {
// Context was cancelled, not a real error
return -1
}
slog.Error("supervisor: go function returned error",
"name", u.name,
"error", err,
)
return 1
}
return 0
}
// Stop gracefully shuts down all supervised units.
func (s *Supervisor) Stop() {
s.cancel()
// Wait with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
slog.Info("supervisor: all units stopped")
case <-time.After(ShutdownTimeout):
slog.Warn("supervisor: shutdown timeout, some units may not have stopped")
}
s.mu.Lock()
s.started = false
s.mu.Unlock()
}
// Restart stops and restarts a specific unit by name.
func (s *Supervisor) Restart(name string) error {
s.mu.RLock()
u, ok := s.units[name]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("supervisor: unit not found: %s", name)
}
// Cancel the current run and wait for the supervision goroutine to exit
u.mu.Lock()
if u.cancel != nil {
u.cancel()
}
done := u.done
u.mu.Unlock()
// Wait for the old supervision goroutine to exit
if done != nil {
<-done
}
// Reset restart counter for the fresh start
u.mu.Lock()
u.restartCount = 0
u.mu.Unlock()
// Start fresh
s.startUnit(u)
return nil
}
// StopUnit stops a specific unit without restarting it.
func (s *Supervisor) StopUnit(name string) error {
s.mu.RLock()
u, ok := s.units[name]
s.mu.RUnlock()
if !ok {
return fmt.Errorf("supervisor: unit not found: %s", name)
}
u.mu.Lock()
if u.cancel != nil {
u.cancel()
}
// Set max restarts to 0 to prevent the loop from restarting
u.restart.MaxRestarts = 0
u.restartCount = 1
u.mu.Unlock()
return nil
}
// Status returns the status of a specific supervised unit.
func (s *Supervisor) Status(name string) (DaemonStatus, error) {
s.mu.RLock()
u, ok := s.units[name]
s.mu.RUnlock()
if !ok {
return DaemonStatus{}, fmt.Errorf("supervisor: unit not found: %s", name)
}
return u.status(), nil
}
// Statuses returns the status of all supervised units.
func (s *Supervisor) Statuses() map[string]DaemonStatus {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]DaemonStatus, len(s.units))
for name, u := range s.units {
result[name] = u.status()
}
return result
}
// UnitNames returns the names of all registered units.
func (s *Supervisor) UnitNames() []string {
s.mu.RLock()
defer s.mu.RUnlock()
names := make([]string, 0, len(s.units))
for name := range s.units {
names = append(names, name)
}
return names
}

View file

@ -0,0 +1,335 @@
package process
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
)
func TestSupervisor_GoFunc_Good(t *testing.T) {
sup := NewSupervisor(nil)
var count atomic.Int32
sup.RegisterFunc(GoSpec{
Name: "counter",
Func: func(ctx context.Context) error {
count.Add(1)
<-ctx.Done()
return nil
},
Restart: RestartPolicy{Delay: 10 * time.Millisecond, MaxRestarts: -1},
})
sup.Start()
time.Sleep(50 * time.Millisecond)
status, err := sup.Status("counter")
if err != nil {
t.Fatal(err)
}
if !status.Running {
t.Error("expected counter to be running")
}
if status.Type != "goroutine" {
t.Errorf("expected type goroutine, got %s", status.Type)
}
sup.Stop()
if c := count.Load(); c < 1 {
t.Errorf("expected counter >= 1, got %d", c)
}
}
func TestSupervisor_GoFunc_Restart_Good(t *testing.T) {
sup := NewSupervisor(nil)
var runs atomic.Int32
sup.RegisterFunc(GoSpec{
Name: "crasher",
Func: func(ctx context.Context) error {
n := runs.Add(1)
if n <= 3 {
return fmt.Errorf("crash #%d", n)
}
// After 3 crashes, stay running
<-ctx.Done()
return nil
},
Restart: RestartPolicy{Delay: 5 * time.Millisecond, MaxRestarts: -1},
})
sup.Start()
// Wait for restarts
time.Sleep(200 * time.Millisecond)
status, _ := sup.Status("crasher")
if status.RestartCount < 3 {
t.Errorf("expected at least 3 restarts, got %d", status.RestartCount)
}
if !status.Running {
t.Error("expected crasher to be running after recovering")
}
sup.Stop()
}
func TestSupervisor_GoFunc_MaxRestarts_Good(t *testing.T) {
sup := NewSupervisor(nil)
sup.RegisterFunc(GoSpec{
Name: "limited",
Func: func(ctx context.Context) error {
return fmt.Errorf("always fail")
},
Restart: RestartPolicy{Delay: 5 * time.Millisecond, MaxRestarts: 2},
})
sup.Start()
time.Sleep(200 * time.Millisecond)
status, _ := sup.Status("limited")
if status.Running {
t.Error("expected limited to have stopped after max restarts")
}
// The function runs once (initial) + 2 restarts = restartCount should be 3
// (restartCount increments each time the function exits)
if status.RestartCount > 3 {
t.Errorf("expected restartCount <= 3, got %d", status.RestartCount)
}
sup.Stop()
}
func TestSupervisor_GoFunc_Panic_Good(t *testing.T) {
sup := NewSupervisor(nil)
var runs atomic.Int32
sup.RegisterFunc(GoSpec{
Name: "panicker",
Func: func(ctx context.Context) error {
n := runs.Add(1)
if n == 1 {
panic("boom")
}
<-ctx.Done()
return nil
},
Restart: RestartPolicy{Delay: 5 * time.Millisecond, MaxRestarts: 3},
})
sup.Start()
time.Sleep(100 * time.Millisecond)
status, _ := sup.Status("panicker")
if !status.Running {
t.Error("expected panicker to recover and be running")
}
if runs.Load() < 2 {
t.Error("expected at least 2 runs (1 panic + 1 recovery)")
}
sup.Stop()
}
func TestSupervisor_Statuses_Good(t *testing.T) {
sup := NewSupervisor(nil)
sup.RegisterFunc(GoSpec{
Name: "a",
Func: func(ctx context.Context) error { <-ctx.Done(); return nil },
Restart: RestartPolicy{MaxRestarts: -1},
})
sup.RegisterFunc(GoSpec{
Name: "b",
Func: func(ctx context.Context) error { <-ctx.Done(); return nil },
Restart: RestartPolicy{MaxRestarts: -1},
})
sup.Start()
time.Sleep(50 * time.Millisecond)
statuses := sup.Statuses()
if len(statuses) != 2 {
t.Errorf("expected 2 statuses, got %d", len(statuses))
}
if !statuses["a"].Running || !statuses["b"].Running {
t.Error("expected both units running")
}
sup.Stop()
}
func TestSupervisor_UnitNames_Good(t *testing.T) {
sup := NewSupervisor(nil)
sup.RegisterFunc(GoSpec{
Name: "alpha",
Func: func(ctx context.Context) error { <-ctx.Done(); return nil },
})
sup.RegisterFunc(GoSpec{
Name: "beta",
Func: func(ctx context.Context) error { <-ctx.Done(); return nil },
})
names := sup.UnitNames()
if len(names) != 2 {
t.Errorf("expected 2 names, got %d", len(names))
}
}
func TestSupervisor_Status_Bad(t *testing.T) {
sup := NewSupervisor(nil)
_, err := sup.Status("nonexistent")
if err == nil {
t.Error("expected error for nonexistent unit")
}
}
func TestSupervisor_Restart_Good(t *testing.T) {
sup := NewSupervisor(nil)
var runs atomic.Int32
sup.RegisterFunc(GoSpec{
Name: "restartable",
Func: func(ctx context.Context) error {
runs.Add(1)
<-ctx.Done()
return nil
},
Restart: RestartPolicy{Delay: 5 * time.Millisecond, MaxRestarts: -1},
})
sup.Start()
time.Sleep(50 * time.Millisecond)
if err := sup.Restart("restartable"); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
if runs.Load() < 2 {
t.Errorf("expected at least 2 runs after restart, got %d", runs.Load())
}
sup.Stop()
}
func TestSupervisor_Restart_Bad(t *testing.T) {
sup := NewSupervisor(nil)
err := sup.Restart("nonexistent")
if err == nil {
t.Error("expected error for nonexistent unit")
}
}
func TestSupervisor_StopUnit_Good(t *testing.T) {
sup := NewSupervisor(nil)
sup.RegisterFunc(GoSpec{
Name: "stoppable",
Func: func(ctx context.Context) error {
<-ctx.Done()
return nil
},
Restart: RestartPolicy{Delay: 5 * time.Millisecond, MaxRestarts: -1},
})
sup.Start()
time.Sleep(50 * time.Millisecond)
if err := sup.StopUnit("stoppable"); err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
status, _ := sup.Status("stoppable")
if status.Running {
t.Error("expected unit to be stopped")
}
sup.Stop()
}
func TestSupervisor_StopUnit_Bad(t *testing.T) {
sup := NewSupervisor(nil)
err := sup.StopUnit("nonexistent")
if err == nil {
t.Error("expected error for nonexistent unit")
}
}
func TestSupervisor_StartIdempotent_Good(t *testing.T) {
sup := NewSupervisor(nil)
var count atomic.Int32
sup.RegisterFunc(GoSpec{
Name: "once",
Func: func(ctx context.Context) error {
count.Add(1)
<-ctx.Done()
return nil
},
})
sup.Start()
sup.Start() // Should be no-op
sup.Start() // Should be no-op
time.Sleep(50 * time.Millisecond)
if count.Load() != 1 {
t.Errorf("expected exactly 1 run, got %d", count.Load())
}
sup.Stop()
}
func TestSupervisor_NoRestart_Good(t *testing.T) {
sup := NewSupervisor(nil)
var runs atomic.Int32
sup.RegisterFunc(GoSpec{
Name: "oneshot",
Func: func(ctx context.Context) error {
runs.Add(1)
return nil // Exit immediately
},
Restart: RestartPolicy{Delay: 5 * time.Millisecond, MaxRestarts: 0},
})
sup.Start()
time.Sleep(100 * time.Millisecond)
status, _ := sup.Status("oneshot")
if status.Running {
t.Error("expected oneshot to not be running")
}
// Should run once (initial) then stop. restartCount will be 1
// (incremented after the initial run exits).
if runs.Load() != 1 {
t.Errorf("expected exactly 1 run, got %d", runs.Load())
}
sup.Stop()
}
func TestSupervisor_Register_Ugly(t *testing.T) {
sup := NewSupervisor(nil)
defer func() {
if r := recover(); r == nil {
t.Error("expected panic when registering process daemon without service")
}
}()
sup.Register(DaemonSpec{
Name: "will-panic",
RunOptions: RunOptions{Command: "echo"},
})
}

View file

@ -1,124 +0,0 @@
package store
import (
"database/sql"
"fmt"
"strings"
"text/template"
_ "modernc.org/sqlite"
)
// Store is a group-namespaced key-value store backed by SQLite.
type Store struct {
db *sql.DB
}
// New creates a Store at the given SQLite path. Use ":memory:" for tests.
func New(dbPath string) (*Store, error) {
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, fmt.Errorf("store.New: %w", err)
}
if _, err := db.Exec("PRAGMA journal_mode=WAL"); err != nil {
db.Close()
return nil, fmt.Errorf("store.New: WAL: %w", err)
}
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS kv (
grp TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (grp, key)
)`); err != nil {
db.Close()
return nil, fmt.Errorf("store.New: schema: %w", err)
}
return &Store{db: db}, nil
}
// Close closes the underlying database.
func (s *Store) Close() error {
return s.db.Close()
}
// Get retrieves a value by group and key.
func (s *Store) Get(group, key string) (string, error) {
var val string
err := s.db.QueryRow("SELECT value FROM kv WHERE grp = ? AND key = ?", group, key).Scan(&val)
if err == sql.ErrNoRows {
return "", fmt.Errorf("store.Get: not found: %s/%s", group, key)
}
if err != nil {
return "", fmt.Errorf("store.Get: %w", err)
}
return val, nil
}
// Set stores a value by group and key, overwriting if exists.
func (s *Store) Set(group, key, value string) error {
_, err := s.db.Exec(
`INSERT INTO kv (grp, key, value) VALUES (?, ?, ?)
ON CONFLICT(grp, key) DO UPDATE SET value = excluded.value`,
group, key, value,
)
if err != nil {
return fmt.Errorf("store.Set: %w", err)
}
return nil
}
// Delete removes a single key from a group.
func (s *Store) Delete(group, key string) error {
_, err := s.db.Exec("DELETE FROM kv WHERE grp = ? AND key = ?", group, key)
if err != nil {
return fmt.Errorf("store.Delete: %w", err)
}
return nil
}
// Count returns the number of keys in a group.
func (s *Store) Count(group string) (int, error) {
var n int
err := s.db.QueryRow("SELECT COUNT(*) FROM kv WHERE grp = ?", group).Scan(&n)
if err != nil {
return 0, fmt.Errorf("store.Count: %w", err)
}
return n, nil
}
// DeleteGroup removes all keys in a group.
func (s *Store) DeleteGroup(group string) error {
_, err := s.db.Exec("DELETE FROM kv WHERE grp = ?", group)
if err != nil {
return fmt.Errorf("store.DeleteGroup: %w", err)
}
return nil
}
// Render loads all key-value pairs from a group and renders a Go template.
func (s *Store) Render(tmplStr, group string) (string, error) {
rows, err := s.db.Query("SELECT key, value FROM kv WHERE grp = ?", group)
if err != nil {
return "", fmt.Errorf("store.Render: query: %w", err)
}
defer rows.Close()
vars := make(map[string]string)
for rows.Next() {
var k, v string
if err := rows.Scan(&k, &v); err != nil {
return "", fmt.Errorf("store.Render: scan: %w", err)
}
vars[k] = v
}
tmpl, err := template.New("render").Parse(tmplStr)
if err != nil {
return "", fmt.Errorf("store.Render: parse: %w", err)
}
var b strings.Builder
if err := tmpl.Execute(&b, vars); err != nil {
return "", fmt.Errorf("store.Render: exec: %w", err)
}
return b.String(), nil
}

View file

@ -1,81 +0,0 @@
package store
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSetGet_Good(t *testing.T) {
s, err := New(":memory:")
require.NoError(t, err)
defer s.Close()
err = s.Set("config", "theme", "dark")
require.NoError(t, err)
val, err := s.Get("config", "theme")
require.NoError(t, err)
assert.Equal(t, "dark", val)
}
func TestGet_Bad_NotFound(t *testing.T) {
s, _ := New(":memory:")
defer s.Close()
_, err := s.Get("config", "missing")
assert.Error(t, err)
}
func TestDelete_Good(t *testing.T) {
s, _ := New(":memory:")
defer s.Close()
_ = s.Set("config", "key", "val")
err := s.Delete("config", "key")
require.NoError(t, err)
_, err = s.Get("config", "key")
assert.Error(t, err)
}
func TestCount_Good(t *testing.T) {
s, _ := New(":memory:")
defer s.Close()
_ = s.Set("grp", "a", "1")
_ = s.Set("grp", "b", "2")
_ = s.Set("other", "c", "3")
n, err := s.Count("grp")
require.NoError(t, err)
assert.Equal(t, 2, n)
}
func TestDeleteGroup_Good(t *testing.T) {
s, _ := New(":memory:")
defer s.Close()
_ = s.Set("grp", "a", "1")
_ = s.Set("grp", "b", "2")
err := s.DeleteGroup("grp")
require.NoError(t, err)
n, _ := s.Count("grp")
assert.Equal(t, 0, n)
}
func TestRender_Good(t *testing.T) {
s, _ := New(":memory:")
defer s.Close()
_ = s.Set("user", "pool", "pool.lthn.io:3333")
_ = s.Set("user", "wallet", "iz...")
tmpl := `{"pool":"{{ .pool }}","wallet":"{{ .wallet }}"}`
out, err := s.Render(tmpl, "user")
require.NoError(t, err)
assert.Contains(t, out, "pool.lthn.io:3333")
assert.Contains(t, out, "iz...")
}