fix(process): mark daemon not-ready before shutdown
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
911abb6ee8
commit
b097e0ef0e
2 changed files with 85 additions and 8 deletions
20
daemon.go
20
daemon.go
|
|
@ -174,10 +174,14 @@ func (d *Daemon) Stop() error {
|
|||
shutdownCtx, cancel := context.WithTimeout(context.Background(), d.opts.ShutdownTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Auto-unregister
|
||||
if d.opts.Registry != nil {
|
||||
if err := d.opts.Registry.Unregister(d.opts.RegistryEntry.Code, d.opts.RegistryEntry.Daemon); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "registry", err))
|
||||
// Mark the daemon unavailable before tearing down listeners or registry state.
|
||||
if d.health != nil {
|
||||
d.health.SetReady(false)
|
||||
}
|
||||
|
||||
if d.health != nil {
|
||||
if err := d.health.Stop(shutdownCtx); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "health server", err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -187,10 +191,10 @@ func (d *Daemon) Stop() error {
|
|||
}
|
||||
}
|
||||
|
||||
if d.health != nil {
|
||||
d.health.SetReady(false)
|
||||
if err := d.health.Stop(shutdownCtx); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "health server", err))
|
||||
// Auto-unregister after the process is no longer serving traffic.
|
||||
if d.opts.Registry != nil {
|
||||
if err := d.opts.Registry.Unregister(d.opts.RegistryEntry.Code, d.opts.RegistryEntry.Daemon); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "registry", err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -36,6 +37,78 @@ func TestDaemon_StartAndStop(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDaemon_StopMarksNotReadyBeforeShutdownCompletes(t *testing.T) {
|
||||
blockCheck := make(chan struct{})
|
||||
checkEntered := make(chan struct{})
|
||||
var once sync.Once
|
||||
|
||||
d := NewDaemon(DaemonOptions{
|
||||
HealthAddr: "127.0.0.1:0",
|
||||
ShutdownTimeout: 5 * time.Second,
|
||||
HealthChecks: []HealthCheck{
|
||||
func() error {
|
||||
once.Do(func() { close(checkEntered) })
|
||||
<-blockCheck
|
||||
return nil
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
err := d.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
addr := d.HealthAddr()
|
||||
require.NotEmpty(t, addr)
|
||||
|
||||
healthErr := make(chan error, 1)
|
||||
go func() {
|
||||
resp, err := http.Get("http://" + addr + "/health")
|
||||
if err != nil {
|
||||
healthErr <- err
|
||||
return
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
healthErr <- nil
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-checkEntered:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("/health request did not enter the blocking check")
|
||||
}
|
||||
|
||||
stopDone := make(chan error, 1)
|
||||
go func() {
|
||||
stopDone <- d.Stop()
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return !d.Ready()
|
||||
}, 500*time.Millisecond, 10*time.Millisecond, "daemon should become not ready before shutdown completes")
|
||||
|
||||
select {
|
||||
case err := <-stopDone:
|
||||
t.Fatalf("daemon stopped too early: %v", err)
|
||||
default:
|
||||
}
|
||||
|
||||
close(blockCheck)
|
||||
|
||||
select {
|
||||
case err := <-stopDone:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("daemon stop did not finish after health check unblocked")
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-healthErr:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("/health request did not finish")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaemon_DoubleStartFails(t *testing.T) {
|
||||
d := NewDaemon(DaemonOptions{
|
||||
HealthAddr: "127.0.0.1:0",
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue