diff --git a/pkg/framework/core/message_bus.go b/pkg/framework/core/message_bus.go index c01b3cc6..457ced2c 100644 --- a/pkg/framework/core/message_bus.go +++ b/pkg/framework/core/message_bus.go @@ -2,7 +2,6 @@ package core import ( "errors" - "fmt" "sync" ) @@ -35,7 +34,7 @@ func (b *messageBus) action(msg Message) error { var agg error for _, h := range handlers { if err := h(b.core, msg); err != nil { - agg = fmt.Errorf("%w; %v", agg, err) + agg = errors.Join(agg, err) } } return agg diff --git a/pkg/framework/core/message_bus_test.go b/pkg/framework/core/message_bus_test.go index d661dc73..e69ac95e 100644 --- a/pkg/framework/core/message_bus_test.go +++ b/pkg/framework/core/message_bus_test.go @@ -1,6 +1,7 @@ package core import ( + "errors" "sync" "testing" @@ -25,6 +26,22 @@ func TestMessageBus_Action_Good(t *testing.T) { assert.Len(t, received, 2) } +func TestMessageBus_Action_Bad(t *testing.T) { + c, _ := New() + + err1 := errors.New("handler1 failed") + err2 := errors.New("handler2 failed") + + c.bus.registerAction(func(_ *Core, msg Message) error { return err1 }) + c.bus.registerAction(func(_ *Core, msg Message) error { return nil }) + c.bus.registerAction(func(_ *Core, msg Message) error { return err2 }) + + err := c.bus.action("test") + assert.Error(t, err) + assert.ErrorIs(t, err, err1) + assert.ErrorIs(t, err, err2) +} + func TestMessageBus_RegisterAction_Good(t *testing.T) { c, _ := New() diff --git a/pkg/framework/core/service_manager.go b/pkg/framework/core/service_manager.go index b2996c47..80c208fd 100644 --- a/pkg/framework/core/service_manager.go +++ b/pkg/framework/core/service_manager.go @@ -26,14 +26,14 @@ func newServiceManager() *serviceManager { // registerService adds a named service to the registry. // It also appends to startables/stoppables if the service implements those interfaces. func (m *serviceManager) registerService(name string, svc any) error { - if m.locked { - return fmt.Errorf("core: service %q is not permitted by the serviceLock setting", name) - } if name == "" { return fmt.Errorf("core: service name cannot be empty") } m.mu.Lock() defer m.mu.Unlock() + if m.locked { + return fmt.Errorf("core: service %q is not permitted by the serviceLock setting", name) + } if _, exists := m.services[name]; exists { return fmt.Errorf("core: service %q already registered", name) } @@ -62,12 +62,16 @@ func (m *serviceManager) service(name string) any { // enableLock marks that the lock should be applied after initialisation. func (m *serviceManager) enableLock() { + m.mu.Lock() + defer m.mu.Unlock() m.lockEnabled = true } // applyLock activates the service lock if it was enabled. // Called once during New() after all options have been processed. func (m *serviceManager) applyLock() { + m.mu.Lock() + defer m.mu.Unlock() if m.lockEnabled { m.locked = true }