fix(process): unregister daemon before health shutdown
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
b097e0ef0e
commit
945e760542
2 changed files with 91 additions and 7 deletions
14
daemon.go
14
daemon.go
|
|
@ -179,9 +179,10 @@ func (d *Daemon) Stop() error {
|
|||
d.health.SetReady(false)
|
||||
}
|
||||
|
||||
if d.health != nil {
|
||||
if err := d.health.Stop(shutdownCtx); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "health server", err))
|
||||
// Auto-unregister after the process is no longer serving traffic.
|
||||
if d.opts.Registry != nil {
|
||||
if err := d.opts.Registry.Unregister(d.opts.RegistryEntry.Code, d.opts.RegistryEntry.Daemon); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "registry", err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -191,10 +192,9 @@ func (d *Daemon) Stop() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Auto-unregister after the process is no longer serving traffic.
|
||||
if d.opts.Registry != nil {
|
||||
if err := d.opts.Registry.Unregister(d.opts.RegistryEntry.Code, d.opts.RegistryEntry.Daemon); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "registry", err))
|
||||
if d.health != nil {
|
||||
if err := d.health.Stop(shutdownCtx); err != nil {
|
||||
errs = append(errs, coreerr.E("Daemon.Stop", "health server", err))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -109,6 +109,90 @@ func TestDaemon_StopMarksNotReadyBeforeShutdownCompletes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDaemon_StopUnregistersBeforeHealthShutdownCompletes(t *testing.T) {
|
||||
blockCheck := make(chan struct{})
|
||||
checkEntered := make(chan struct{})
|
||||
var once sync.Once
|
||||
dir := t.TempDir()
|
||||
reg := NewRegistry(filepath.Join(dir, "registry"))
|
||||
|
||||
d := NewDaemon(DaemonOptions{
|
||||
HealthAddr: "127.0.0.1:0",
|
||||
ShutdownTimeout: 5 * time.Second,
|
||||
Registry: reg,
|
||||
RegistryEntry: DaemonEntry{
|
||||
Code: "test-app",
|
||||
Daemon: "serve",
|
||||
},
|
||||
HealthChecks: []HealthCheck{
|
||||
func() error {
|
||||
once.Do(func() { close(checkEntered) })
|
||||
<-blockCheck
|
||||
return nil
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
err := d.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
addr := d.HealthAddr()
|
||||
require.NotEmpty(t, addr)
|
||||
|
||||
healthErr := make(chan error, 1)
|
||||
go func() {
|
||||
resp, err := http.Get("http://" + addr + "/health")
|
||||
if err != nil {
|
||||
healthErr <- err
|
||||
return
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
healthErr <- nil
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-checkEntered:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("/health request did not enter the blocking check")
|
||||
}
|
||||
|
||||
stopDone := make(chan error, 1)
|
||||
go func() {
|
||||
stopDone <- d.Stop()
|
||||
}()
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return !d.Ready()
|
||||
}, 500*time.Millisecond, 10*time.Millisecond, "daemon should become not ready before shutdown completes")
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
_, ok := reg.Get("test-app", "serve")
|
||||
return !ok
|
||||
}, 500*time.Millisecond, 10*time.Millisecond, "daemon should unregister before health shutdown completes")
|
||||
|
||||
select {
|
||||
case err := <-stopDone:
|
||||
t.Fatalf("daemon stopped too early: %v", err)
|
||||
default:
|
||||
}
|
||||
|
||||
close(blockCheck)
|
||||
|
||||
select {
|
||||
case err := <-stopDone:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("daemon stop did not finish after health check unblocked")
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-healthErr:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("/health request did not finish")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDaemon_DoubleStartFails(t *testing.T) {
|
||||
d := NewDaemon(DaemonOptions{
|
||||
HealthAddr: "127.0.0.1:0",
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue