feat(runtime): make provider lifecycle and API more agent-friendly
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
67156a6870
commit
7d5c562818
3 changed files with 110 additions and 45 deletions
35
providers.go
35
providers.go
|
|
@ -46,6 +46,7 @@ func (p *ProvidersAPI) list(c *gin.Context) {
|
|||
|
||||
for _, info := range registryInfo {
|
||||
dto := providerDTO{
|
||||
Code: info.Name,
|
||||
Name: info.Name,
|
||||
BasePath: info.BasePath,
|
||||
Channels: info.Channels,
|
||||
|
|
@ -60,22 +61,26 @@ func (p *ProvidersAPI) list(c *gin.Context) {
|
|||
providers = append(providers, dto)
|
||||
}
|
||||
|
||||
seen := make(map[string]struct{}, len(providers))
|
||||
for _, p := range providers {
|
||||
seen[p.Code+"|"+p.BasePath] = struct{}{}
|
||||
}
|
||||
|
||||
// Add runtime providers not already in registry
|
||||
for _, ri := range runtimeInfo {
|
||||
found := false
|
||||
for _, p := range providers {
|
||||
if p.Name == ri.Code {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
providers = append(providers, providerDTO{
|
||||
Name: ri.Code,
|
||||
BasePath: ri.Namespace,
|
||||
Status: "active",
|
||||
})
|
||||
if _, found := seen[ri.Code+"|"+ri.Namespace]; found {
|
||||
continue
|
||||
}
|
||||
|
||||
providers = append(providers, providerDTO{
|
||||
Code: ri.Code,
|
||||
Name: ri.Code,
|
||||
Display: ri.Name,
|
||||
Version: ri.Version,
|
||||
Namespace: ri.Namespace,
|
||||
BasePath: ri.Namespace,
|
||||
Status: ri.Status,
|
||||
})
|
||||
}
|
||||
|
||||
// Keep output deterministic for agents and tooling.
|
||||
|
|
@ -94,8 +99,12 @@ type providersResponse struct {
|
|||
}
|
||||
|
||||
type providerDTO struct {
|
||||
Code string `json:"code"`
|
||||
Name string `json:"name"`
|
||||
Display string `json:"displayName,omitempty"`
|
||||
BasePath string `json:"basePath"`
|
||||
Version string `json:"version"`
|
||||
Namespace string `json:"namespace"`
|
||||
Status string `json:"status,omitempty"`
|
||||
Element *elementDTO `json:"element,omitempty"`
|
||||
Channels []string `json:"channels,omitempty"`
|
||||
|
|
|
|||
114
runtime.go
114
runtime.go
|
|
@ -55,9 +55,6 @@ func defaultProvidersDir() string {
|
|||
// Providers that fail to start are logged and skipped — they do not prevent
|
||||
// other providers from starting.
|
||||
func (rm *RuntimeManager) StartAll(ctx context.Context) error {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
|
||||
dir := defaultProvidersDir()
|
||||
discovered, err := marketplace.DiscoverProviders(dir)
|
||||
if err != nil {
|
||||
|
|
@ -71,22 +68,30 @@ func (rm *RuntimeManager) StartAll(ctx context.Context) error {
|
|||
|
||||
corelib.Info("runtime: discovered providers", "count", len(discovered), "dir", dir)
|
||||
|
||||
started := make([]*RuntimeProvider, 0, len(discovered))
|
||||
for _, dp := range discovered {
|
||||
rp, err := rm.startProvider(ctx, dp)
|
||||
if err != nil {
|
||||
corelib.Warn("runtime: failed to start provider", "code", dp.Manifest.Code, "err", err)
|
||||
continue
|
||||
}
|
||||
rm.providers = append(rm.providers, rp)
|
||||
started = append(started, rp)
|
||||
corelib.Info("runtime: started provider", "code", dp.Manifest.Code, "port", rp.Port)
|
||||
}
|
||||
|
||||
rm.mu.Lock()
|
||||
rm.providers = append(rm.providers, started...)
|
||||
rm.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// startProvider starts a single provider binary and registers its proxy.
|
||||
func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.DiscoveredProvider) (*RuntimeProvider, error) {
|
||||
m := dp.Manifest
|
||||
if rm.engine == nil {
|
||||
return nil, corelib.E("runtime.startProvider", "runtime engine not configured", nil)
|
||||
}
|
||||
|
||||
// Assign a free port.
|
||||
port, err := findFreePort()
|
||||
|
|
@ -117,9 +122,9 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc
|
|||
|
||||
// Wait for health check.
|
||||
healthURL := corelib.Sprintf("http://127.0.0.1:%d/health", port)
|
||||
if err := waitForHealth(healthURL, 10*time.Second); err != nil {
|
||||
if err := waitForHealth(ctx, healthURL, 10*time.Second); err != nil {
|
||||
// Kill the process if health check fails.
|
||||
_ = cmd.Process.Kill()
|
||||
stopProviderProcess(cmd, 2*time.Second)
|
||||
return nil, corelib.E("runtime.startProvider", corelib.Sprintf("health check failed for %s", m.Code), err)
|
||||
}
|
||||
|
||||
|
|
@ -168,27 +173,20 @@ func (rm *RuntimeManager) startProvider(ctx context.Context, dp marketplace.Disc
|
|||
// StopAll terminates all running provider processes.
|
||||
func (rm *RuntimeManager) StopAll() {
|
||||
rm.mu.Lock()
|
||||
defer rm.mu.Unlock()
|
||||
providers := rm.providers
|
||||
rm.providers = nil
|
||||
rm.mu.Unlock()
|
||||
|
||||
for _, rp := range rm.providers {
|
||||
for _, rp := range providers {
|
||||
if rp.Cmd != nil && rp.Cmd.Process != nil {
|
||||
corelib.Info("runtime: stopping provider", "code", rp.Manifest.Code, "pid", rp.Cmd.Process.Pid)
|
||||
_ = rp.Cmd.Process.Signal(os.Interrupt)
|
||||
|
||||
// Give the process 5 seconds to exit gracefully.
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- rp.Cmd.Wait() }()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Exited cleanly.
|
||||
case <-time.After(5 * time.Second):
|
||||
_ = rp.Cmd.Process.Kill()
|
||||
name := ""
|
||||
if rp.Manifest != nil {
|
||||
name = rp.Manifest.Code
|
||||
}
|
||||
corelib.Info("runtime: stopping provider", "code", name, "pid", rp.Cmd.Process.Pid)
|
||||
stopProviderProcess(rp.Cmd, 5*time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
rm.providers = nil
|
||||
}
|
||||
|
||||
// List returns a copy of all running provider info.
|
||||
|
|
@ -205,6 +203,7 @@ func (rm *RuntimeManager) List() []RuntimeProviderInfo {
|
|||
Namespace: rp.Manifest.Namespace,
|
||||
Port: rp.Port,
|
||||
Dir: rp.Dir,
|
||||
Status: "running",
|
||||
})
|
||||
}
|
||||
return infos
|
||||
|
|
@ -218,6 +217,7 @@ type RuntimeProviderInfo struct {
|
|||
Namespace string `json:"namespace"`
|
||||
Port int `json:"port"`
|
||||
Dir string `json:"dir"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// findFreePort asks the OS for an available TCP port on 127.0.0.1.
|
||||
|
|
@ -235,22 +235,78 @@ func findFreePort() (int, error) {
|
|||
}
|
||||
|
||||
// waitForHealth polls a health URL until it returns 200 or the timeout expires.
|
||||
func waitForHealth(url string, timeout time.Duration) error {
|
||||
deadline := time.Now().Add(timeout)
|
||||
client := &http.Client{Timeout: 2 * time.Second}
|
||||
func waitForHealth(ctx context.Context, url string, timeout time.Duration) error {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
if timeout <= 0 {
|
||||
timeout = 5 * time.Second
|
||||
}
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
resp, err := client.Get(url)
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
client := &http.Client{Timeout: 2 * time.Second}
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
req, err := http.NewRequestWithContext(timeoutCtx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return corelib.E("runtime.waitForHealth", "create health request", err)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err == nil {
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
select {
|
||||
case <-timeoutCtx.Done():
|
||||
return corelib.E(
|
||||
"runtime.waitForHealth",
|
||||
corelib.Sprintf("timed out after %s: %s", timeout, url),
|
||||
timeoutCtx.Err(),
|
||||
)
|
||||
case <-ticker.C:
|
||||
// Retry health check.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func stopProviderProcess(cmd *exec.Cmd, timeout time.Duration) {
|
||||
if cmd == nil || cmd.Process == nil {
|
||||
return
|
||||
}
|
||||
|
||||
return corelib.E("runtime.waitForHealth", corelib.Sprintf("timed out after %s: %s", timeout, url), nil)
|
||||
_ = cmd.Process.Signal(os.Interrupt)
|
||||
if stopProviderProcessWait(cmd, timeout) {
|
||||
return
|
||||
}
|
||||
|
||||
_ = cmd.Process.Kill()
|
||||
stopProviderProcessWait(cmd, 2*time.Second)
|
||||
}
|
||||
|
||||
func stopProviderProcessWait(cmd *exec.Cmd, timeout time.Duration) bool {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
_ = cmd.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return true
|
||||
case <-timer.C:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// staticAssetGroup is a simple RouteGroup that serves static files.
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ func TestWaitForHealth_Good(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
err := waitForHealth(srv.URL, 5*time.Second)
|
||||
err := waitForHealth(context.Background(), srv.URL, 5*time.Second)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
@ -46,13 +46,13 @@ func TestWaitForHealth_Bad_Timeout(t *testing.T) {
|
|||
}))
|
||||
defer srv.Close()
|
||||
|
||||
err := waitForHealth(srv.URL, 500*time.Millisecond)
|
||||
err := waitForHealth(context.Background(), srv.URL, 500*time.Millisecond)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "timed out")
|
||||
}
|
||||
|
||||
func TestWaitForHealth_Bad_NoServer(t *testing.T) {
|
||||
err := waitForHealth("http://127.0.0.1:1", 500*time.Millisecond)
|
||||
err := waitForHealth(context.Background(), "http://127.0.0.1:1", 500*time.Millisecond)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "timed out")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue