From 7d5c562818bf4a38d6be228ebeeb3596c38bb5ab Mon Sep 17 00:00:00 2001 From: Virgil Date: Mon, 30 Mar 2026 07:56:28 +0000 Subject: [PATCH] feat(runtime): make provider lifecycle and API more agent-friendly Co-Authored-By: Virgil --- providers.go | 35 +++++++++------ runtime.go | 114 ++++++++++++++++++++++++++++++++++++------------ runtime_test.go | 6 +-- 3 files changed, 110 insertions(+), 45 deletions(-) diff --git a/providers.go b/providers.go index 0ef12ad..7d89f79 100644 --- a/providers.go +++ b/providers.go @@ -46,6 +46,7 @@ func (p *ProvidersAPI) list(c *gin.Context) { for _, info := range registryInfo { dto := providerDTO{ + Code: info.Name, Name: info.Name, BasePath: info.BasePath, Channels: info.Channels, @@ -60,22 +61,26 @@ func (p *ProvidersAPI) list(c *gin.Context) { providers = append(providers, dto) } + seen := make(map[string]struct{}, len(providers)) + for _, p := range providers { + seen[p.Code+"|"+p.BasePath] = struct{}{} + } + // Add runtime providers not already in registry for _, ri := range runtimeInfo { - found := false - for _, p := range providers { - if p.Name == ri.Code { - found = true - break - } - } - if !found { - providers = append(providers, providerDTO{ - Name: ri.Code, - BasePath: ri.Namespace, - Status: "active", - }) + if _, found := seen[ri.Code+"|"+ri.Namespace]; found { + continue } + + providers = append(providers, providerDTO{ + Code: ri.Code, + Name: ri.Code, + Display: ri.Name, + Version: ri.Version, + Namespace: ri.Namespace, + BasePath: ri.Namespace, + Status: ri.Status, + }) } // Keep output deterministic for agents and tooling. @@ -94,8 +99,12 @@ type providersResponse struct { } type providerDTO struct { + Code string `json:"code"` Name string `json:"name"` + Display string `json:"displayName,omitempty"` BasePath string `json:"basePath"` + Version string `json:"version"` + Namespace string `json:"namespace"` Status string `json:"status,omitempty"` Element *elementDTO `json:"element,omitempty"` Channels []string `json:"channels,omitempty"` diff --git a/runtime.go b/runtime.go index 98f8572..d1ca47b 100644 --- a/runtime.go +++ b/runtime.go @@ -55,9 +55,6 @@ func defaultProvidersDir() string { // Providers that fail to start are logged and skipped — they do not prevent // other providers from starting. func (rm *RuntimeManager) StartAll(ctx context.Context) error { - rm.mu.Lock() - defer rm.mu.Unlock() - dir := defaultProvidersDir() discovered, err := marketplace.DiscoverProviders(dir) if err != nil { @@ -71,22 +68,30 @@ func (rm *RuntimeManager) StartAll(ctx context.Context) error { corelib.Info("runtime: discovered providers", "count", len(discovered), "dir", dir) + started := make([]*RuntimeProvider, 0, len(discovered)) for _, dp := range discovered { rp, err := rm.startProvider(ctx, dp) if err != nil { corelib.Warn("runtime: failed to start provider", "code", dp.Manifest.Code, "err", err) continue } - rm.providers = append(rm.providers, rp) + started = append(started, rp) corelib.Info("runtime: started provider", "code", dp.Manifest.Code, "port", rp.Port) } + rm.mu.Lock() + rm.providers = append(rm.providers, started...) + rm.mu.Unlock() + return nil } // startProvider starts a single provider binary and registers its proxy. func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.DiscoveredProvider) (*RuntimeProvider, error) { m := dp.Manifest + if rm.engine == nil { + return nil, corelib.E("runtime.startProvider", "runtime engine not configured", nil) + } // Assign a free port. port, err := findFreePort() @@ -117,9 +122,9 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc // Wait for health check. healthURL := corelib.Sprintf("http://127.0.0.1:%d/health", port) - if err := waitForHealth(healthURL, 10*time.Second); err != nil { + if err := waitForHealth(ctx, healthURL, 10*time.Second); err != nil { // Kill the process if health check fails. - _ = cmd.Process.Kill() + stopProviderProcess(cmd, 2*time.Second) return nil, corelib.E("runtime.startProvider", corelib.Sprintf("health check failed for %s", m.Code), err) } @@ -168,27 +173,20 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc // StopAll terminates all running provider processes. func (rm *RuntimeManager) StopAll() { rm.mu.Lock() - defer rm.mu.Unlock() + providers := rm.providers + rm.providers = nil + rm.mu.Unlock() - for _, rp := range rm.providers { + for _, rp := range providers { if rp.Cmd != nil && rp.Cmd.Process != nil { - corelib.Info("runtime: stopping provider", "code", rp.Manifest.Code, "pid", rp.Cmd.Process.Pid) - _ = rp.Cmd.Process.Signal(os.Interrupt) - - // Give the process 5 seconds to exit gracefully. - done := make(chan error, 1) - go func() { done <- rp.Cmd.Wait() }() - - select { - case <-done: - // Exited cleanly. - case <-time.After(5 * time.Second): - _ = rp.Cmd.Process.Kill() + name := "" + if rp.Manifest != nil { + name = rp.Manifest.Code } + corelib.Info("runtime: stopping provider", "code", name, "pid", rp.Cmd.Process.Pid) + stopProviderProcess(rp.Cmd, 5*time.Second) } } - - rm.providers = nil } // List returns a copy of all running provider info. @@ -205,6 +203,7 @@ func (rm *RuntimeManager) List() []RuntimeProviderInfo { Namespace: rp.Manifest.Namespace, Port: rp.Port, Dir: rp.Dir, + Status: "running", }) } return infos @@ -218,6 +217,7 @@ type RuntimeProviderInfo struct { Namespace string `json:"namespace"` Port int `json:"port"` Dir string `json:"dir"` + Status string `json:"status"` } // findFreePort asks the OS for an available TCP port on 127.0.0.1. @@ -235,22 +235,78 @@ func findFreePort() (int, error) { } // waitForHealth polls a health URL until it returns 200 or the timeout expires. -func waitForHealth(url string, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - client := &http.Client{Timeout: 2 * time.Second} +func waitForHealth(ctx context.Context, url string, timeout time.Duration) error { + if ctx == nil { + ctx = context.Background() + } + if timeout <= 0 { + timeout = 5 * time.Second + } - for time.Now().Before(deadline) { - resp, err := client.Get(url) + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + client := &http.Client{Timeout: 2 * time.Second} + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, url, nil) + if err != nil { + return corelib.E("runtime.waitForHealth", "create health request", err) + } + + resp, err := client.Do(req) if err == nil { resp.Body.Close() if resp.StatusCode == http.StatusOK { return nil } } - time.Sleep(100 * time.Millisecond) + + select { + case <-timeoutCtx.Done(): + return corelib.E( + "runtime.waitForHealth", + corelib.Sprintf("timed out after %s: %s", timeout, url), + timeoutCtx.Err(), + ) + case <-ticker.C: + // Retry health check. + } + } +} + +func stopProviderProcess(cmd *exec.Cmd, timeout time.Duration) { + if cmd == nil || cmd.Process == nil { + return } - return corelib.E("runtime.waitForHealth", corelib.Sprintf("timed out after %s: %s", timeout, url), nil) + _ = cmd.Process.Signal(os.Interrupt) + if stopProviderProcessWait(cmd, timeout) { + return + } + + _ = cmd.Process.Kill() + stopProviderProcessWait(cmd, 2*time.Second) +} + +func stopProviderProcessWait(cmd *exec.Cmd, timeout time.Duration) bool { + done := make(chan struct{}) + go func() { + _ = cmd.Wait() + close(done) + }() + + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-done: + return true + case <-timer.C: + return false + } } // staticAssetGroup is a simple RouteGroup that serves static files. diff --git a/runtime_test.go b/runtime_test.go index 6b36427..6e4dde5 100644 --- a/runtime_test.go +++ b/runtime_test.go @@ -36,7 +36,7 @@ func TestWaitForHealth_Good(t *testing.T) { })) defer srv.Close() - err := waitForHealth(srv.URL, 5*time.Second) + err := waitForHealth(context.Background(), srv.URL, 5*time.Second) assert.NoError(t, err) } @@ -46,13 +46,13 @@ func TestWaitForHealth_Bad_Timeout(t *testing.T) { })) defer srv.Close() - err := waitForHealth(srv.URL, 500*time.Millisecond) + err := waitForHealth(context.Background(), srv.URL, 500*time.Millisecond) require.Error(t, err) assert.Contains(t, err.Error(), "timed out") } func TestWaitForHealth_Bad_NoServer(t *testing.T) { - err := waitForHealth("http://127.0.0.1:1", 500*time.Millisecond) + err := waitForHealth(context.Background(), "http://127.0.0.1:1", 500*time.Millisecond) require.Error(t, err) assert.Contains(t, err.Error(), "timed out") }