feat(runtime): align provider lifecycle with AX requirements
Improve provider discovery dedupe, startup/shutdown robustness, and health-check cancellation semantics. Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
0669feb69b
commit
f361fd69f6
3 changed files with 137 additions and 54 deletions
55
providers.go
55
providers.go
|
|
@ -3,6 +3,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"forge.lthn.ai/core/api/pkg/provider"
|
"forge.lthn.ai/core/api/pkg/provider"
|
||||||
|
|
@ -40,15 +41,26 @@ func (p *ProvidersAPI) list(c *gin.Context) {
|
||||||
registryInfo := p.registry.Info()
|
registryInfo := p.registry.Info()
|
||||||
runtimeInfo := p.runtime.List()
|
runtimeInfo := p.runtime.List()
|
||||||
|
|
||||||
// Merge runtime provider info with registry info
|
// Merge registry and runtime provider data without duplication.
|
||||||
providers := make([]providerDTO, 0, len(registryInfo)+len(runtimeInfo))
|
providers := make([]providerDTO, 0, len(registryInfo)+len(runtimeInfo))
|
||||||
|
seen := make(map[string]struct{}, len(registryInfo)+len(runtimeInfo))
|
||||||
|
|
||||||
|
providerKey := func(name, namespace string) string {
|
||||||
|
return fmt.Sprintf("%s|%s", name, namespace)
|
||||||
|
}
|
||||||
|
|
||||||
for _, info := range registryInfo {
|
for _, info := range registryInfo {
|
||||||
|
key := providerKey(info.Name, info.BasePath)
|
||||||
|
if _, ok := seen[key]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[key] = struct{}{}
|
||||||
|
|
||||||
dto := providerDTO{
|
dto := providerDTO{
|
||||||
Name: info.Name,
|
Name: info.Name,
|
||||||
BasePath: info.BasePath,
|
BasePath: info.BasePath,
|
||||||
Channels: info.Channels,
|
|
||||||
Status: "active",
|
Status: "active",
|
||||||
|
Channels: info.Channels,
|
||||||
}
|
}
|
||||||
if info.Element != nil {
|
if info.Element != nil {
|
||||||
dto.Element = &elementDTO{
|
dto.Element = &elementDTO{
|
||||||
|
|
@ -61,20 +73,20 @@ func (p *ProvidersAPI) list(c *gin.Context) {
|
||||||
|
|
||||||
// Add runtime providers not already in registry
|
// Add runtime providers not already in registry
|
||||||
for _, ri := range runtimeInfo {
|
for _, ri := range runtimeInfo {
|
||||||
found := false
|
key := providerKey(ri.Code, ri.Namespace)
|
||||||
for _, p := range providers {
|
if _, ok := seen[key]; ok {
|
||||||
if p.Name == ri.Code {
|
continue
|
||||||
found = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !found {
|
|
||||||
providers = append(providers, providerDTO{
|
|
||||||
Name: ri.Code,
|
|
||||||
BasePath: ri.Namespace,
|
|
||||||
Status: "active",
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
seen[key] = struct{}{}
|
||||||
|
|
||||||
|
providers = append(providers, providerDTO{
|
||||||
|
Name: ri.Code,
|
||||||
|
BasePath: ri.Namespace,
|
||||||
|
Status: ri.Status,
|
||||||
|
Code: ri.Code,
|
||||||
|
Version: ri.Version,
|
||||||
|
Namespace: ri.Namespace,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
c.JSON(http.StatusOK, providersResponse{Providers: providers})
|
c.JSON(http.StatusOK, providersResponse{Providers: providers})
|
||||||
|
|
@ -85,11 +97,14 @@ type providersResponse struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type providerDTO struct {
|
type providerDTO struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
BasePath string `json:"basePath"`
|
BasePath string `json:"basePath"`
|
||||||
Status string `json:"status,omitempty"`
|
Status string `json:"status,omitempty"`
|
||||||
Element *elementDTO `json:"element,omitempty"`
|
Code string `json:"code,omitempty"`
|
||||||
Channels []string `json:"channels,omitempty"`
|
Version string `json:"version,omitempty"`
|
||||||
|
Namespace string `json:"namespace,omitempty"`
|
||||||
|
Element *elementDTO `json:"element,omitempty"`
|
||||||
|
Channels []string `json:"channels,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type elementDTO struct {
|
type elementDTO struct {
|
||||||
|
|
|
||||||
127
runtime.go
127
runtime.go
|
|
@ -58,9 +58,6 @@ func defaultProvidersDir() string {
|
||||||
// Providers that fail to start are logged and skipped — they do not prevent
|
// Providers that fail to start are logged and skipped — they do not prevent
|
||||||
// other providers from starting.
|
// other providers from starting.
|
||||||
func (rm *RuntimeManager) StartAll(ctx context.Context) error {
|
func (rm *RuntimeManager) StartAll(ctx context.Context) error {
|
||||||
rm.mu.Lock()
|
|
||||||
defer rm.mu.Unlock()
|
|
||||||
|
|
||||||
dir := defaultProvidersDir()
|
dir := defaultProvidersDir()
|
||||||
discovered, err := marketplace.DiscoverProviders(dir)
|
discovered, err := marketplace.DiscoverProviders(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -73,23 +70,39 @@ func (rm *RuntimeManager) StartAll(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("runtime: discovered %d provider(s) in %s", len(discovered), dir)
|
log.Printf("runtime: discovered %d provider(s) in %s", len(discovered), dir)
|
||||||
|
started := make([]*RuntimeProvider, 0, len(discovered))
|
||||||
|
seen := make(map[string]struct{}, len(discovered))
|
||||||
|
|
||||||
for _, dp := range discovered {
|
for _, dp := range discovered {
|
||||||
|
key := fmt.Sprintf("%s|%s", dp.Manifest.Code, dp.Manifest.Namespace)
|
||||||
|
if _, ok := seen[key]; ok {
|
||||||
|
log.Printf("runtime: skipped duplicate provider discovery for %s", dp.Manifest.Code)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
rp, err := rm.startProvider(ctx, dp)
|
rp, err := rm.startProvider(ctx, dp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("runtime: failed to start %s: %v", dp.Manifest.Code, err)
|
log.Printf("runtime: failed to start %s: %v", dp.Manifest.Code, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
rm.providers = append(rm.providers, rp)
|
seen[key] = struct{}{}
|
||||||
|
started = append(started, rp)
|
||||||
log.Printf("runtime: started %s on port %d", dp.Manifest.Code, rp.Port)
|
log.Printf("runtime: started %s on port %d", dp.Manifest.Code, rp.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rm.mu.Lock()
|
||||||
|
rm.providers = append(rm.providers, started...)
|
||||||
|
rm.mu.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startProvider starts a single provider binary and registers its proxy.
|
// startProvider starts a single provider binary and registers its proxy.
|
||||||
func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.DiscoveredProvider) (*RuntimeProvider, error) {
|
func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.DiscoveredProvider) (*RuntimeProvider, error) {
|
||||||
m := dp.Manifest
|
m := dp.Manifest
|
||||||
|
if rm.engine == nil {
|
||||||
|
return nil, coreerr.E("runtime.startProvider", "runtime engine not configured", nil)
|
||||||
|
}
|
||||||
|
|
||||||
// Assign a free port.
|
// Assign a free port.
|
||||||
port, err := findFreePort()
|
port, err := findFreePort()
|
||||||
|
|
@ -120,9 +133,9 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc
|
||||||
|
|
||||||
// Wait for health check.
|
// Wait for health check.
|
||||||
healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", port)
|
healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", port)
|
||||||
if err := waitForHealth(healthURL, 10*time.Second); err != nil {
|
if err := waitForHealth(ctx, healthURL, 10*time.Second); err != nil {
|
||||||
// Kill the process if health check fails.
|
// Stop the process if health check fails.
|
||||||
_ = cmd.Process.Kill()
|
stopProviderProcess(cmd, 2*time.Second)
|
||||||
return nil, coreerr.E("runtime.startProvider", fmt.Sprintf("health check failed for %s", m.Code), err)
|
return nil, coreerr.E("runtime.startProvider", fmt.Sprintf("health check failed for %s", m.Code), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -171,27 +184,20 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc
|
||||||
// StopAll terminates all running provider processes.
|
// StopAll terminates all running provider processes.
|
||||||
func (rm *RuntimeManager) StopAll() {
|
func (rm *RuntimeManager) StopAll() {
|
||||||
rm.mu.Lock()
|
rm.mu.Lock()
|
||||||
defer rm.mu.Unlock()
|
providers := rm.providers
|
||||||
|
rm.providers = nil
|
||||||
|
rm.mu.Unlock()
|
||||||
|
|
||||||
for _, rp := range rm.providers {
|
for _, rp := range providers {
|
||||||
if rp.Cmd != nil && rp.Cmd.Process != nil {
|
if rp.Cmd != nil && rp.Cmd.Process != nil {
|
||||||
log.Printf("runtime: stopping %s (pid %d)", rp.Manifest.Code, rp.Cmd.Process.Pid)
|
name := ""
|
||||||
_ = rp.Cmd.Process.Signal(os.Interrupt)
|
if rp.Manifest != nil {
|
||||||
|
name = rp.Manifest.Code
|
||||||
// Give the process 5 seconds to exit gracefully.
|
|
||||||
done := make(chan error, 1)
|
|
||||||
go func() { done <- rp.Cmd.Wait() }()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// Exited cleanly.
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
_ = rp.Cmd.Process.Kill()
|
|
||||||
}
|
}
|
||||||
|
log.Printf("runtime: stopping provider (%s) pid %d", name, rp.Cmd.Process.Pid)
|
||||||
|
stopProviderProcess(rp.Cmd, 5*time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rm.providers = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a copy of all running provider info.
|
// List returns a copy of all running provider info.
|
||||||
|
|
@ -208,6 +214,7 @@ func (rm *RuntimeManager) List() []RuntimeProviderInfo {
|
||||||
Namespace: rp.Manifest.Namespace,
|
Namespace: rp.Manifest.Namespace,
|
||||||
Port: rp.Port,
|
Port: rp.Port,
|
||||||
Dir: rp.Dir,
|
Dir: rp.Dir,
|
||||||
|
Status: "running",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return infos
|
return infos
|
||||||
|
|
@ -221,6 +228,7 @@ type RuntimeProviderInfo struct {
|
||||||
Namespace string `json:"namespace"`
|
Namespace string `json:"namespace"`
|
||||||
Port int `json:"port"`
|
Port int `json:"port"`
|
||||||
Dir string `json:"dir"`
|
Dir string `json:"dir"`
|
||||||
|
Status string `json:"status"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// findFreePort asks the OS for an available TCP port on 127.0.0.1.
|
// findFreePort asks the OS for an available TCP port on 127.0.0.1.
|
||||||
|
|
@ -238,22 +246,81 @@ func findFreePort() (int, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// waitForHealth polls a health URL until it returns 200 or the timeout expires.
|
// waitForHealth polls a health URL until it returns 200 or the timeout expires.
|
||||||
func waitForHealth(url string, timeout time.Duration) error {
|
func waitForHealth(ctx context.Context, url string, timeout time.Duration) error {
|
||||||
deadline := time.Now().Add(timeout)
|
if ctx == nil {
|
||||||
client := &http.Client{Timeout: 2 * time.Second}
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
if timeout <= 0 {
|
||||||
|
timeout = 5 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
for time.Now().Before(deadline) {
|
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
resp, err := client.Get(url)
|
defer cancel()
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 2 * time.Second}
|
||||||
|
ticker := time.NewTicker(100 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return coreerr.E("runtime.waitForHealth", "create health request", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := client.Do(req)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
if resp.StatusCode == http.StatusOK {
|
if resp.StatusCode == http.StatusOK {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(100 * time.Millisecond)
|
select {
|
||||||
|
case <-timeoutCtx.Done():
|
||||||
|
return coreerr.E(
|
||||||
|
"runtime.waitForHealth",
|
||||||
|
fmt.Sprintf("timed out after %s: %s", timeout, url),
|
||||||
|
timeoutCtx.Err(),
|
||||||
|
)
|
||||||
|
case <-ticker.C:
|
||||||
|
// Keep polling until timeout.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func stopProviderProcess(cmd *exec.Cmd, timeout time.Duration) {
|
||||||
|
if cmd == nil || cmd.Process == nil {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return coreerr.E("runtime.waitForHealth", fmt.Sprintf("timed out after %s: %s", timeout, url), nil)
|
if timeout <= 0 {
|
||||||
|
timeout = 1 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = cmd.Process.Signal(os.Interrupt)
|
||||||
|
if stopProviderProcessWait(cmd, timeout) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = cmd.Process.Kill()
|
||||||
|
stopProviderProcessWait(cmd, 2*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
func stopProviderProcessWait(cmd *exec.Cmd, timeout time.Duration) bool {
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
_ = cmd.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
timer := time.NewTimer(timeout)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return true
|
||||||
|
case <-timer.C:
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// staticAssetGroup is a simple RouteGroup that serves static files.
|
// staticAssetGroup is a simple RouteGroup that serves static files.
|
||||||
|
|
|
||||||
|
|
@ -36,7 +36,7 @@ func TestWaitForHealth_Good(t *testing.T) {
|
||||||
}))
|
}))
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
err := waitForHealth(srv.URL, 5*time.Second)
|
err := waitForHealth(context.Background(), srv.URL, 5*time.Second)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -46,13 +46,13 @@ func TestWaitForHealth_Bad_Timeout(t *testing.T) {
|
||||||
}))
|
}))
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
|
|
||||||
err := waitForHealth(srv.URL, 500*time.Millisecond)
|
err := waitForHealth(context.Background(), srv.URL, 500*time.Millisecond)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Contains(t, err.Error(), "timed out")
|
assert.Contains(t, err.Error(), "timed out")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWaitForHealth_Bad_NoServer(t *testing.T) {
|
func TestWaitForHealth_Bad_NoServer(t *testing.T) {
|
||||||
err := waitForHealth("http://127.0.0.1:1", 500*time.Millisecond)
|
err := waitForHealth(context.Background(), "http://127.0.0.1:1", 500*time.Millisecond)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
assert.Contains(t, err.Error(), "timed out")
|
assert.Contains(t, err.Error(), "timed out")
|
||||||
}
|
}
|
||||||
|
|
@ -79,7 +79,7 @@ func TestRuntimeManager_List_Good_WithProviders(t *testing.T) {
|
||||||
Code: "test-svc",
|
Code: "test-svc",
|
||||||
Name: "Test Service",
|
Name: "Test Service",
|
||||||
Version: "1.0.0",
|
Version: "1.0.0",
|
||||||
Namespace: "test",
|
Namespace: "test",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -92,6 +92,7 @@ func TestRuntimeManager_List_Good_WithProviders(t *testing.T) {
|
||||||
assert.Equal(t, "test", infos[0].Namespace)
|
assert.Equal(t, "test", infos[0].Namespace)
|
||||||
assert.Equal(t, 12345, infos[0].Port)
|
assert.Equal(t, 12345, infos[0].Port)
|
||||||
assert.Equal(t, "/tmp/test-provider", infos[0].Dir)
|
assert.Equal(t, "/tmp/test-provider", infos[0].Dir)
|
||||||
|
assert.Equal(t, "running", infos[0].Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRuntimeManager_StopAll_Good_Empty(t *testing.T) {
|
func TestRuntimeManager_StopAll_Good_Empty(t *testing.T) {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue