go-rocm/server.go
Claude 523abc6509
feat(ax): pass 2 — replace banned imports, rename variables, add AX comments
Replace fmt/strings/path/filepath/encoding/json with core equivalents throughout
all packages. Rename cfg→configuration, srv→server/subprocess, ftName→fileTypeName,
ctxSize→contextSize. Add usage-example doc-comments to every exported symbol.
Update all test names to TestSubject_Function_{Good,Bad,Ugly} convention.

Co-Authored-By: Virgil <virgil@lethean.io>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 08:24:34 +01:00

209 lines
5.3 KiB
Go

//go:build linux && amd64
package rocm
import (
"context"
"net"
"os"
"os/exec"
"strconv"
"syscall"
"time"
"dappco.re/go/core"
coreerr "forge.lthn.ai/core/go-log"
"forge.lthn.ai/core/go-rocm/internal/llamacpp"
)
// server manages a llama-server subprocess.
type server struct {
cmd *exec.Cmd
port int
client *llamacpp.Client
exited chan struct{}
exitErr error // safe to read only after <-exited
}
// alive reports whether the llama-server process is still running.
func (s *server) alive() bool {
select {
case <-s.exited:
return false
default:
return true
}
}
// findLlamaServer locates the llama-server binary.
// Checks ROCM_LLAMA_SERVER_PATH first, then PATH.
//
// path, err := findLlamaServer()
// // path == "/usr/local/bin/llama-server"
func findLlamaServer() (string, error) {
if binaryPath := core.Env("ROCM_LLAMA_SERVER_PATH"); binaryPath != "" {
if !(&core.Fs{}).New("/").Exists(binaryPath) {
return "", coreerr.E("rocm.findLlamaServer", "llama-server not found at ROCM_LLAMA_SERVER_PATH="+binaryPath, nil)
}
return binaryPath, nil
}
binaryPath, err := exec.LookPath("llama-server")
if err != nil {
return "", coreerr.E("rocm.findLlamaServer", "llama-server not found in PATH", err)
}
return binaryPath, nil
}
// freePort asks the kernel for a free TCP port on localhost.
func freePort() (int, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, coreerr.E("rocm.freePort", "listen for free port", err)
}
port := listener.Addr().(*net.TCPAddr).Port
listener.Close()
return port, nil
}
// serverEnv returns the environment for the llama-server subprocess.
// Filters any existing HIP_VISIBLE_DEVICES and sets it to 0 to mask the iGPU.
// This is critical — the Ryzen 9 iGPU crashes llama-server if not masked.
func serverEnv() []string {
environ := os.Environ()
env := make([]string, 0, len(environ)+1)
for _, envEntry := range environ {
if core.HasPrefix(envEntry, "HIP_VISIBLE_DEVICES=") {
continue
}
env = append(env, envEntry)
}
env = append(env, "HIP_VISIBLE_DEVICES=0")
return env
}
// startServer spawns llama-server and waits for it to become ready.
// It selects a free port automatically, retrying up to 3 times if the
// process exits during startup (e.g. port conflict).
//
// s, err := startServer("/usr/local/bin/llama-server", "/data/model.gguf", 99, 4096, 4)
// defer s.stop()
func startServer(binary, modelPath string, gpuLayers, contextSize, parallelSlots int) (*server, error) {
if gpuLayers < 0 {
gpuLayers = 999
}
const maxAttempts = 3
var lastErr error
for attempt := range maxAttempts {
port, err := freePort()
if err != nil {
return nil, coreerr.E("rocm.startServer", "find free port", err)
}
args := []string{
"--model", modelPath,
"--host", "127.0.0.1",
"--port", strconv.Itoa(port),
"--n-gpu-layers", strconv.Itoa(gpuLayers),
}
if contextSize > 0 {
args = append(args, "--ctx-size", strconv.Itoa(contextSize))
}
if parallelSlots > 0 {
args = append(args, "--parallel", strconv.Itoa(parallelSlots))
}
cmd := exec.Command(binary, args...)
cmd.Env = serverEnv()
if err := cmd.Start(); err != nil {
return nil, coreerr.E("rocm.startServer", "start llama-server", err)
}
subprocess := &server{
cmd: cmd,
port: port,
client: llamacpp.NewClient(core.Sprintf("http://127.0.0.1:%d", port)),
exited: make(chan struct{}),
}
go func() {
subprocess.exitErr = cmd.Wait()
close(subprocess.exited)
}()
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
err = subprocess.waitReady(ctx)
cancel()
if err == nil {
return subprocess, nil
}
// Only retry if the process actually exited (e.g. port conflict).
// A timeout means the server is stuck, not a port issue.
select {
case <-subprocess.exited:
_ = subprocess.stop()
lastErr = coreerr.E("rocm.startServer", core.Sprintf("attempt %d", attempt+1), err)
continue
default:
_ = subprocess.stop()
return nil, coreerr.E("rocm.startServer", "llama-server not ready", err)
}
}
return nil, coreerr.E("rocm.startServer", core.Sprintf("server failed after %d attempts", maxAttempts), lastErr)
}
// waitReady polls the health endpoint until the server is ready.
func (s *server) waitReady(ctx context.Context) error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return coreerr.E("server.waitReady", "timeout waiting for llama-server", ctx.Err())
case <-s.exited:
return coreerr.E("server.waitReady", "llama-server exited before becoming ready", s.exitErr)
case <-ticker.C:
if err := s.client.Health(ctx); err == nil {
return nil
}
}
}
}
// stop sends SIGTERM and waits up to 5s, then SIGKILL.
func (s *server) stop() error {
if s.cmd.Process == nil {
return nil
}
// Already exited?
select {
case <-s.exited:
return s.exitErr
default:
}
// Send SIGTERM for graceful shutdown.
if err := s.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return coreerr.E("server.stop", "sigterm llama-server", err)
}
// Wait up to 5 seconds for clean exit.
select {
case <-s.exited:
return s.exitErr
case <-time.After(5 * time.Second):
// Force kill.
if err := s.cmd.Process.Kill(); err != nil {
return coreerr.E("server.stop", "kill llama-server", err)
}
<-s.exited
return s.exitErr
}
}