diff --git a/providers.go b/providers.go index 0df6830..50a21ef 100644 --- a/providers.go +++ b/providers.go @@ -3,6 +3,7 @@ package main import ( + "fmt" "net/http" "forge.lthn.ai/core/api/pkg/provider" @@ -40,15 +41,26 @@ func (p *ProvidersAPI) list(c *gin.Context) { registryInfo := p.registry.Info() runtimeInfo := p.runtime.List() - // Merge runtime provider info with registry info + // Merge registry and runtime provider data without duplication. providers := make([]providerDTO, 0, len(registryInfo)+len(runtimeInfo)) + seen := make(map[string]struct{}, len(registryInfo)+len(runtimeInfo)) + + providerKey := func(name, namespace string) string { + return fmt.Sprintf("%s|%s", name, namespace) + } for _, info := range registryInfo { + key := providerKey(info.Name, info.BasePath) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + dto := providerDTO{ Name: info.Name, BasePath: info.BasePath, - Channels: info.Channels, Status: "active", + Channels: info.Channels, } if info.Element != nil { dto.Element = &elementDTO{ @@ -61,20 +73,20 @@ func (p *ProvidersAPI) list(c *gin.Context) { // Add runtime providers not already in registry for _, ri := range runtimeInfo { - found := false - for _, p := range providers { - if p.Name == ri.Code { - found = true - break - } - } - if !found { - providers = append(providers, providerDTO{ - Name: ri.Code, - BasePath: ri.Namespace, - Status: "active", - }) + key := providerKey(ri.Code, ri.Namespace) + if _, ok := seen[key]; ok { + continue } + seen[key] = struct{}{} + + providers = append(providers, providerDTO{ + Name: ri.Code, + BasePath: ri.Namespace, + Status: ri.Status, + Code: ri.Code, + Version: ri.Version, + Namespace: ri.Namespace, + }) } c.JSON(http.StatusOK, providersResponse{Providers: providers}) @@ -85,11 +97,14 @@ type providersResponse struct { } type providerDTO struct { - Name string `json:"name"` - BasePath string `json:"basePath"` - Status string `json:"status,omitempty"` - Element *elementDTO `json:"element,omitempty"` - Channels []string `json:"channels,omitempty"` + Name string `json:"name"` + BasePath string `json:"basePath"` + Status string `json:"status,omitempty"` + Code string `json:"code,omitempty"` + Version string `json:"version,omitempty"` + Namespace string `json:"namespace,omitempty"` + Element *elementDTO `json:"element,omitempty"` + Channels []string `json:"channels,omitempty"` } type elementDTO struct { diff --git a/runtime.go b/runtime.go index f01db98..dc760ab 100644 --- a/runtime.go +++ b/runtime.go @@ -58,9 +58,6 @@ func defaultProvidersDir() string { // Providers that fail to start are logged and skipped — they do not prevent // other providers from starting. func (rm *RuntimeManager) StartAll(ctx context.Context) error { - rm.mu.Lock() - defer rm.mu.Unlock() - dir := defaultProvidersDir() discovered, err := marketplace.DiscoverProviders(dir) if err != nil { @@ -73,23 +70,39 @@ func (rm *RuntimeManager) StartAll(ctx context.Context) error { } log.Printf("runtime: discovered %d provider(s) in %s", len(discovered), dir) + started := make([]*RuntimeProvider, 0, len(discovered)) + seen := make(map[string]struct{}, len(discovered)) for _, dp := range discovered { + key := fmt.Sprintf("%s|%s", dp.Manifest.Code, dp.Manifest.Namespace) + if _, ok := seen[key]; ok { + log.Printf("runtime: skipped duplicate provider discovery for %s", dp.Manifest.Code) + continue + } + rp, err := rm.startProvider(ctx, dp) if err != nil { log.Printf("runtime: failed to start %s: %v", dp.Manifest.Code, err) continue } - rm.providers = append(rm.providers, rp) + seen[key] = struct{}{} + started = append(started, rp) log.Printf("runtime: started %s on port %d", dp.Manifest.Code, rp.Port) } + rm.mu.Lock() + rm.providers = append(rm.providers, started...) + rm.mu.Unlock() + return nil } // startProvider starts a single provider binary and registers its proxy. func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.DiscoveredProvider) (*RuntimeProvider, error) { m := dp.Manifest + if rm.engine == nil { + return nil, coreerr.E("runtime.startProvider", "runtime engine not configured", nil) + } // Assign a free port. port, err := findFreePort() @@ -120,9 +133,9 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc // Wait for health check. healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", port) - if err := waitForHealth(healthURL, 10*time.Second); err != nil { - // Kill the process if health check fails. - _ = cmd.Process.Kill() + if err := waitForHealth(ctx, healthURL, 10*time.Second); err != nil { + // Stop the process if health check fails. + stopProviderProcess(cmd, 2*time.Second) return nil, coreerr.E("runtime.startProvider", fmt.Sprintf("health check failed for %s", m.Code), err) } @@ -171,27 +184,20 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc // StopAll terminates all running provider processes. func (rm *RuntimeManager) StopAll() { rm.mu.Lock() - defer rm.mu.Unlock() + providers := rm.providers + rm.providers = nil + rm.mu.Unlock() - for _, rp := range rm.providers { + for _, rp := range providers { if rp.Cmd != nil && rp.Cmd.Process != nil { - log.Printf("runtime: stopping %s (pid %d)", rp.Manifest.Code, rp.Cmd.Process.Pid) - _ = rp.Cmd.Process.Signal(os.Interrupt) - - // Give the process 5 seconds to exit gracefully. - done := make(chan error, 1) - go func() { done <- rp.Cmd.Wait() }() - - select { - case <-done: - // Exited cleanly. - case <-time.After(5 * time.Second): - _ = rp.Cmd.Process.Kill() + name := "" + if rp.Manifest != nil { + name = rp.Manifest.Code } + log.Printf("runtime: stopping provider (%s) pid %d", name, rp.Cmd.Process.Pid) + stopProviderProcess(rp.Cmd, 5*time.Second) } } - - rm.providers = nil } // List returns a copy of all running provider info. @@ -208,6 +214,7 @@ func (rm *RuntimeManager) List() []RuntimeProviderInfo { Namespace: rp.Manifest.Namespace, Port: rp.Port, Dir: rp.Dir, + Status: "running", }) } return infos @@ -221,6 +228,7 @@ type RuntimeProviderInfo struct { Namespace string `json:"namespace"` Port int `json:"port"` Dir string `json:"dir"` + Status string `json:"status"` } // findFreePort asks the OS for an available TCP port on 127.0.0.1. @@ -238,22 +246,81 @@ func findFreePort() (int, error) { } // waitForHealth polls a health URL until it returns 200 or the timeout expires. -func waitForHealth(url string, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - client := &http.Client{Timeout: 2 * time.Second} +func waitForHealth(ctx context.Context, url string, timeout time.Duration) error { + if ctx == nil { + ctx = context.Background() + } + if timeout <= 0 { + timeout = 5 * time.Second + } - for time.Now().Before(deadline) { - resp, err := client.Get(url) + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + client := &http.Client{Timeout: 2 * time.Second} + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, url, nil) + if err != nil { + return coreerr.E("runtime.waitForHealth", "create health request", err) + } + + resp, err := client.Do(req) if err == nil { resp.Body.Close() if resp.StatusCode == http.StatusOK { return nil } } - time.Sleep(100 * time.Millisecond) + select { + case <-timeoutCtx.Done(): + return coreerr.E( + "runtime.waitForHealth", + fmt.Sprintf("timed out after %s: %s", timeout, url), + timeoutCtx.Err(), + ) + case <-ticker.C: + // Keep polling until timeout. + } + } +} + +func stopProviderProcess(cmd *exec.Cmd, timeout time.Duration) { + if cmd == nil || cmd.Process == nil { + return } - return coreerr.E("runtime.waitForHealth", fmt.Sprintf("timed out after %s: %s", timeout, url), nil) + if timeout <= 0 { + timeout = 1 * time.Second + } + + _ = cmd.Process.Signal(os.Interrupt) + if stopProviderProcessWait(cmd, timeout) { + return + } + + _ = cmd.Process.Kill() + stopProviderProcessWait(cmd, 2*time.Second) +} + +func stopProviderProcessWait(cmd *exec.Cmd, timeout time.Duration) bool { + done := make(chan struct{}) + go func() { + _ = cmd.Wait() + close(done) + }() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-done: + return true + case <-timer.C: + return false + } } // staticAssetGroup is a simple RouteGroup that serves static files. diff --git a/runtime_test.go b/runtime_test.go index 6b36427..193fb4c 100644 --- a/runtime_test.go +++ b/runtime_test.go @@ -36,7 +36,7 @@ func TestWaitForHealth_Good(t *testing.T) { })) defer srv.Close() - err := waitForHealth(srv.URL, 5*time.Second) + err := waitForHealth(context.Background(), srv.URL, 5*time.Second) assert.NoError(t, err) } @@ -46,13 +46,13 @@ func TestWaitForHealth_Bad_Timeout(t *testing.T) { })) defer srv.Close() - err := waitForHealth(srv.URL, 500*time.Millisecond) + err := waitForHealth(context.Background(), srv.URL, 500*time.Millisecond) require.Error(t, err) assert.Contains(t, err.Error(), "timed out") } func TestWaitForHealth_Bad_NoServer(t *testing.T) { - err := waitForHealth("http://127.0.0.1:1", 500*time.Millisecond) + err := waitForHealth(context.Background(), "http://127.0.0.1:1", 500*time.Millisecond) require.Error(t, err) assert.Contains(t, err.Error(), "timed out") } @@ -79,7 +79,7 @@ func TestRuntimeManager_List_Good_WithProviders(t *testing.T) { Code: "test-svc", Name: "Test Service", Version: "1.0.0", - Namespace: "test", + Namespace: "test", }, }, } @@ -92,6 +92,7 @@ func TestRuntimeManager_List_Good_WithProviders(t *testing.T) { assert.Equal(t, "test", infos[0].Namespace) assert.Equal(t, 12345, infos[0].Port) assert.Equal(t, "/tmp/test-provider", infos[0].Dir) + assert.Equal(t, "running", infos[0].Status) } func TestRuntimeManager_StopAll_Good_Empty(t *testing.T) {