cli/pkg/container/linuxkit.go
Snider 18f68ef907 refactor(core): decompose Core into serviceManager + messageBus (#282)
* refactor(core): decompose Core into serviceManager + messageBus (#215)

Extract two focused, unexported components from the Core "god object":

- serviceManager: owns service registry, lifecycle tracking (startables/
  stoppables), and service lock
- messageBus: owns IPC action dispatch, query handling, and task handling

All public API methods on Core become one-line delegation wrappers.
Zero consumer changes — no files outside pkg/framework/core/ modified.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(core): remove unused fields from test struct

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(core): address review feedback from Gemini and Copilot

- Move locked check inside mutex in registerService to fix TOCTOU race
- Add mutex guards to enableLock and applyLock methods
- Replace fmt.Errorf with errors.Join in action() for correct error
  aggregation (consistent with queryAll and lifecycle methods)
- Add TestMessageBus_Action_Bad for error aggregation coverage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci(workflows): bump host-uk/build from v3 to v4

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci(workflows): replace Wails build with Go CLI build

The build action doesn't yet support Wails v3. Comment out the GUI
build step and use host-uk/build/actions/setup/go for Go toolchain
setup with a plain `go build` for the CLI binary.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(container): check context before select in Stop to fix flaky test

Stop() now checks ctx.Err() before entering the select block. When a
pre-cancelled context is passed, the select could non-deterministically
choose <-done over <-ctx.Done() if the process had already exited,
causing TestLinuxKitManager_Stop_Good_ContextCancelled to fail on CI.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(ci): trim CodeQL matrix to valid languages

Remove javascript-typescript and actions from CodeQL matrix — this
repo contains only Go and Python. Invalid languages blocked SARIF
upload and prevented merge.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat(go): add `core go fuzz` command and wire into QA

- New `core go fuzz` command discovers Fuzz* targets and runs them
  with configurable --duration (default 10s per target)
- Fuzz added to default QA checks with 5s burst duration
- Seed fuzz targets for core package: FuzzE (error constructor),
  FuzzServiceRegistration, FuzzMessageDispatch

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci(codeql): add workflow_dispatch trigger for manual runs

Allows manual triggering of CodeQL when the automatic pull_request
trigger doesn't fire.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci(codeql): remove workflow in favour of default setup

CodeQL default setup is now enabled via repo settings for go and
python. The workflow-based approach uploaded results as "code quality"
rather than "code scanning", which didn't satisfy the code_scanning
ruleset requirement. Default setup handles this natively.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci(workflows): add explicit permissions to all workflows

- agent-verify: add issues: write (was missing, writes comments/labels)
- ci: add contents: read (explicit least-privilege)
- coverage: add contents: read (explicit least-privilege)

All workflows now declare permissions explicitly. Repo default is
read-only, so workflows without a block silently lacked write access.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ci(workflows): replace inline logic with org reusable workflow callers

agent-verify.yml and auto-project.yml now delegate to centralised
reusable workflows in host-uk/.github, reducing per-repo duplication.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 13:40:16 +00:00

444 lines
9.9 KiB
Go

package container
import (
"bufio"
"context"
"fmt"
goio "io"
"os"
"os/exec"
"syscall"
"time"
"github.com/host-uk/core/pkg/io"
)
// LinuxKitManager implements the Manager interface for LinuxKit VMs.
type LinuxKitManager struct {
state *State
hypervisor Hypervisor
}
// NewLinuxKitManager creates a new LinuxKit manager with auto-detected hypervisor.
func NewLinuxKitManager() (*LinuxKitManager, error) {
statePath, err := DefaultStatePath()
if err != nil {
return nil, fmt.Errorf("failed to determine state path: %w", err)
}
state, err := LoadState(statePath)
if err != nil {
return nil, fmt.Errorf("failed to load state: %w", err)
}
hypervisor, err := DetectHypervisor()
if err != nil {
return nil, err
}
return &LinuxKitManager{
state: state,
hypervisor: hypervisor,
}, nil
}
// NewLinuxKitManagerWithHypervisor creates a manager with a specific hypervisor.
func NewLinuxKitManagerWithHypervisor(state *State, hypervisor Hypervisor) *LinuxKitManager {
return &LinuxKitManager{
state: state,
hypervisor: hypervisor,
}
}
// Run starts a new LinuxKit VM from the given image.
func (m *LinuxKitManager) Run(ctx context.Context, image string, opts RunOptions) (*Container, error) {
// Validate image exists
if !io.Local.IsFile(image) {
return nil, fmt.Errorf("image not found: %s", image)
}
// Detect image format
format := DetectImageFormat(image)
if format == FormatUnknown {
return nil, fmt.Errorf("unsupported image format: %s", image)
}
// Generate container ID
id, err := GenerateID()
if err != nil {
return nil, fmt.Errorf("failed to generate container ID: %w", err)
}
// Apply defaults
if opts.Memory <= 0 {
opts.Memory = 1024
}
if opts.CPUs <= 0 {
opts.CPUs = 1
}
if opts.SSHPort <= 0 {
opts.SSHPort = 2222
}
// Use name or generate from ID
name := opts.Name
if name == "" {
name = id[:8]
}
// Ensure logs directory exists
if err := EnsureLogsDir(); err != nil {
return nil, fmt.Errorf("failed to create logs directory: %w", err)
}
// Get log file path
logPath, err := LogPath(id)
if err != nil {
return nil, fmt.Errorf("failed to determine log path: %w", err)
}
// Build hypervisor options
hvOpts := &HypervisorOptions{
Memory: opts.Memory,
CPUs: opts.CPUs,
LogFile: logPath,
SSHPort: opts.SSHPort,
Ports: opts.Ports,
Volumes: opts.Volumes,
Detach: opts.Detach,
}
// Build the command
cmd, err := m.hypervisor.BuildCommand(ctx, image, hvOpts)
if err != nil {
return nil, fmt.Errorf("failed to build hypervisor command: %w", err)
}
// Create log file
logFile, err := os.Create(logPath)
if err != nil {
return nil, fmt.Errorf("failed to create log file: %w", err)
}
// Create container record
container := &Container{
ID: id,
Name: name,
Image: image,
Status: StatusRunning,
StartedAt: time.Now(),
Ports: opts.Ports,
Memory: opts.Memory,
CPUs: opts.CPUs,
}
if opts.Detach {
// Run in background
cmd.Stdout = logFile
cmd.Stderr = logFile
// Start the process
if err := cmd.Start(); err != nil {
_ = logFile.Close()
return nil, fmt.Errorf("failed to start VM: %w", err)
}
container.PID = cmd.Process.Pid
// Save state
if err := m.state.Add(container); err != nil {
// Try to kill the process we just started
_ = cmd.Process.Kill()
_ = logFile.Close()
return nil, fmt.Errorf("failed to save state: %w", err)
}
// Close log file handle (process has its own)
_ = logFile.Close()
// Start a goroutine to wait for process exit and update state
go m.waitForExit(container.ID, cmd)
return container, nil
}
// Run in foreground
// Tee output to both log file and stdout
stdout, err := cmd.StdoutPipe()
if err != nil {
_ = logFile.Close()
return nil, fmt.Errorf("failed to get stdout pipe: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
_ = logFile.Close()
return nil, fmt.Errorf("failed to get stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
_ = logFile.Close()
return nil, fmt.Errorf("failed to start VM: %w", err)
}
container.PID = cmd.Process.Pid
// Save state before waiting
if err := m.state.Add(container); err != nil {
_ = cmd.Process.Kill()
_ = logFile.Close()
return nil, fmt.Errorf("failed to save state: %w", err)
}
// Copy output to both log and stdout
go func() {
mw := goio.MultiWriter(logFile, os.Stdout)
_, _ = goio.Copy(mw, stdout)
}()
go func() {
mw := goio.MultiWriter(logFile, os.Stderr)
_, _ = goio.Copy(mw, stderr)
}()
// Wait for the process to complete
if err := cmd.Wait(); err != nil {
container.Status = StatusError
} else {
container.Status = StatusStopped
}
_ = logFile.Close()
if err := m.state.Update(container); err != nil {
return container, fmt.Errorf("update container state: %w", err)
}
return container, nil
}
// waitForExit monitors a detached process and updates state when it exits.
func (m *LinuxKitManager) waitForExit(id string, cmd *exec.Cmd) {
err := cmd.Wait()
container, ok := m.state.Get(id)
if ok {
if err != nil {
container.Status = StatusError
} else {
container.Status = StatusStopped
}
_ = m.state.Update(container)
}
}
// Stop stops a running container by sending SIGTERM.
func (m *LinuxKitManager) Stop(ctx context.Context, id string) error {
container, ok := m.state.Get(id)
if !ok {
return fmt.Errorf("container not found: %s", id)
}
if container.Status != StatusRunning {
return fmt.Errorf("container is not running: %s", id)
}
// Find the process
process, err := os.FindProcess(container.PID)
if err != nil {
// Process doesn't exist, update state
container.Status = StatusStopped
_ = m.state.Update(container)
return nil
}
// Send SIGTERM
if err := process.Signal(syscall.SIGTERM); err != nil {
// Process might already be gone
container.Status = StatusStopped
_ = m.state.Update(container)
return nil
}
// Honour already-cancelled contexts before waiting
if err := ctx.Err(); err != nil {
_ = process.Signal(syscall.SIGKILL)
return err
}
// Wait for graceful shutdown with timeout
done := make(chan struct{})
go func() {
_, _ = process.Wait()
close(done)
}()
select {
case <-done:
// Process exited gracefully
case <-time.After(10 * time.Second):
// Force kill
_ = process.Signal(syscall.SIGKILL)
<-done
case <-ctx.Done():
// Context cancelled
_ = process.Signal(syscall.SIGKILL)
return ctx.Err()
}
container.Status = StatusStopped
return m.state.Update(container)
}
// List returns all known containers, verifying process state.
func (m *LinuxKitManager) List(ctx context.Context) ([]*Container, error) {
containers := m.state.All()
// Verify each running container's process is still alive
for _, c := range containers {
if c.Status == StatusRunning {
if !isProcessRunning(c.PID) {
c.Status = StatusStopped
_ = m.state.Update(c)
}
}
}
return containers, nil
}
// isProcessRunning checks if a process with the given PID is still running.
func isProcessRunning(pid int) bool {
process, err := os.FindProcess(pid)
if err != nil {
return false
}
// On Unix, FindProcess always succeeds, so we need to send signal 0 to check
err = process.Signal(syscall.Signal(0))
return err == nil
}
// Logs returns a reader for the container's log output.
func (m *LinuxKitManager) Logs(ctx context.Context, id string, follow bool) (goio.ReadCloser, error) {
_, ok := m.state.Get(id)
if !ok {
return nil, fmt.Errorf("container not found: %s", id)
}
logPath, err := LogPath(id)
if err != nil {
return nil, fmt.Errorf("failed to determine log path: %w", err)
}
if !io.Local.IsFile(logPath) {
return nil, fmt.Errorf("no logs available for container: %s", id)
}
if !follow {
// Simple case: just open and return the file
return os.Open(logPath)
}
// Follow mode: create a reader that tails the file
return newFollowReader(ctx, logPath)
}
// followReader implements goio.ReadCloser for following log files.
type followReader struct {
file *os.File
ctx context.Context
cancel context.CancelFunc
reader *bufio.Reader
}
func newFollowReader(ctx context.Context, path string) (*followReader, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
// Seek to end
_, _ = file.Seek(0, goio.SeekEnd)
ctx, cancel := context.WithCancel(ctx)
return &followReader{
file: file,
ctx: ctx,
cancel: cancel,
reader: bufio.NewReader(file),
}, nil
}
func (f *followReader) Read(p []byte) (int, error) {
for {
select {
case <-f.ctx.Done():
return 0, goio.EOF
default:
}
n, err := f.reader.Read(p)
if n > 0 {
return n, nil
}
if err != nil && err != goio.EOF {
return 0, err
}
// No data available, wait a bit and try again
select {
case <-f.ctx.Done():
return 0, goio.EOF
case <-time.After(100 * time.Millisecond):
// Reset reader to pick up new data
f.reader.Reset(f.file)
}
}
}
func (f *followReader) Close() error {
f.cancel()
return f.file.Close()
}
// Exec executes a command inside the container via SSH.
func (m *LinuxKitManager) Exec(ctx context.Context, id string, cmd []string) error {
container, ok := m.state.Get(id)
if !ok {
return fmt.Errorf("container not found: %s", id)
}
if container.Status != StatusRunning {
return fmt.Errorf("container is not running: %s", id)
}
// Default SSH port
sshPort := 2222
// Build SSH command
sshArgs := []string{
"-p", fmt.Sprintf("%d", sshPort),
"-o", "StrictHostKeyChecking=accept-new",
"-o", "UserKnownHostsFile=~/.core/known_hosts",
"-o", "LogLevel=ERROR",
"root@localhost",
}
sshArgs = append(sshArgs, cmd...)
sshCmd := exec.CommandContext(ctx, "ssh", sshArgs...)
sshCmd.Stdin = os.Stdin
sshCmd.Stdout = os.Stdout
sshCmd.Stderr = os.Stderr
return sshCmd.Run()
}
// State returns the manager's state (for testing).
func (m *LinuxKitManager) State() *State {
return m.state
}
// Hypervisor returns the manager's hypervisor (for testing).
func (m *LinuxKitManager) Hypervisor() Hypervisor {
return m.hypervisor
}