From 4cd0cf14a3c16bb530b022383806e53c09cbe891 Mon Sep 17 00:00:00 2001 From: Snider Date: Wed, 4 Feb 2026 12:03:48 +0000 Subject: [PATCH] refactor(core): decompose Core into serviceManager + messageBus (#215) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract two focused, unexported components from the Core "god object": - serviceManager: owns service registry, lifecycle tracking (startables/ stoppables), and service lock - messageBus: owns IPC action dispatch, query handling, and task handling All public API methods on Core become one-line delegation wrappers. Zero consumer changes — no files outside pkg/framework/core/ modified. Co-Authored-By: Claude Opus 4.5 --- pkg/framework/core/core.go | 124 ++++--------------- pkg/framework/core/interfaces.go | 26 +--- pkg/framework/core/message_bus.go | 120 ++++++++++++++++++ pkg/framework/core/message_bus_test.go | 129 ++++++++++++++++++++ pkg/framework/core/service_manager.go | 90 ++++++++++++++ pkg/framework/core/service_manager_test.go | 135 +++++++++++++++++++++ 6 files changed, 502 insertions(+), 122 deletions(-) create mode 100644 pkg/framework/core/message_bus.go create mode 100644 pkg/framework/core/message_bus_test.go create mode 100644 pkg/framework/core/service_manager.go create mode 100644 pkg/framework/core/service_manager_test.go diff --git a/pkg/framework/core/core.go b/pkg/framework/core/core.go index 2f0f70c7..5f6c3938 100644 --- a/pkg/framework/core/core.go +++ b/pkg/framework/core/core.go @@ -7,6 +7,12 @@ import ( "fmt" "reflect" "strings" + "sync" +) + +var ( + instance *Core + instanceMu sync.RWMutex ) // New initialises a Core instance using the provided options and performs the necessary setup. @@ -20,18 +26,18 @@ import ( // ) func New(opts ...Option) (*Core, error) { c := &Core{ - services: make(map[string]any), Features: &Features{}, + svc: newServiceManager(), } + c.bus = newMessageBus(c) + for _, o := range opts { if err := o(c); err != nil { return nil, err } } - if c.serviceLock { - c.servicesLocked = true - } + c.svc.applyLock() return c, nil } @@ -121,7 +127,7 @@ func WithAssets(fs embed.FS) Option { // prevent late-binding of services that could have unintended consequences. func WithServiceLock() Option { return func(c *Core) error { - c.serviceLock = true + c.svc.enableLock() return nil } } @@ -131,9 +137,7 @@ func WithServiceLock() Option { // ServiceStartup is the entry point for the Core service's startup lifecycle. // It is called by the GUI runtime when the application starts. func (c *Core) ServiceStartup(ctx context.Context, options any) error { - c.serviceMu.RLock() - startables := append([]Startable(nil), c.startables...) - c.serviceMu.RUnlock() + startables := c.svc.getStartables() var agg error for _, s := range startables { @@ -157,10 +161,7 @@ func (c *Core) ServiceShutdown(ctx context.Context) error { agg = errors.Join(agg, err) } - c.serviceMu.RLock() - stoppables := append([]Stoppable(nil), c.stoppables...) - c.serviceMu.RUnlock() - + stoppables := c.svc.getStoppables() for i := len(stoppables) - 1; i >= 0; i-- { if err := stoppables[i].OnShutdown(ctx); err != nil { agg = errors.Join(agg, err) @@ -173,135 +174,56 @@ func (c *Core) ServiceShutdown(ctx context.Context) error { // ACTION dispatches a message to all registered IPC handlers. // This is the primary mechanism for services to communicate with each other. func (c *Core) ACTION(msg Message) error { - c.ipcMu.RLock() - handlers := append([]func(*Core, Message) error(nil), c.ipcHandlers...) - c.ipcMu.RUnlock() - - var agg error - for _, h := range handlers { - if err := h(c, msg); err != nil { - agg = fmt.Errorf("%w; %v", agg, err) - } - } - return agg + return c.bus.action(msg) } // RegisterAction adds a new IPC handler to the Core. func (c *Core) RegisterAction(handler func(*Core, Message) error) { - c.ipcMu.Lock() - c.ipcHandlers = append(c.ipcHandlers, handler) - c.ipcMu.Unlock() + c.bus.registerAction(handler) } // RegisterActions adds multiple IPC handlers to the Core. func (c *Core) RegisterActions(handlers ...func(*Core, Message) error) { - c.ipcMu.Lock() - c.ipcHandlers = append(c.ipcHandlers, handlers...) - c.ipcMu.Unlock() + c.bus.registerActions(handlers...) } // QUERY dispatches a query to handlers until one responds. // Returns (result, handled, error). If no handler responds, handled is false. func (c *Core) QUERY(q Query) (any, bool, error) { - c.queryMu.RLock() - handlers := append([]QueryHandler(nil), c.queryHandlers...) - c.queryMu.RUnlock() - - for _, h := range handlers { - result, handled, err := h(c, q) - if handled { - return result, true, err - } - } - return nil, false, nil + return c.bus.query(q) } // QUERYALL dispatches a query to all handlers and collects all responses. // Returns all results from handlers that responded. func (c *Core) QUERYALL(q Query) ([]any, error) { - c.queryMu.RLock() - handlers := append([]QueryHandler(nil), c.queryHandlers...) - c.queryMu.RUnlock() - - var results []any - var agg error - for _, h := range handlers { - result, handled, err := h(c, q) - if err != nil { - agg = errors.Join(agg, err) - } - if handled && result != nil { - results = append(results, result) - } - } - return results, agg + return c.bus.queryAll(q) } // PERFORM dispatches a task to handlers until one executes it. // Returns (result, handled, error). If no handler responds, handled is false. func (c *Core) PERFORM(t Task) (any, bool, error) { - c.taskMu.RLock() - handlers := append([]TaskHandler(nil), c.taskHandlers...) - c.taskMu.RUnlock() - - for _, h := range handlers { - result, handled, err := h(c, t) - if handled { - return result, true, err - } - } - return nil, false, nil + return c.bus.perform(t) } // RegisterQuery adds a query handler to the Core. func (c *Core) RegisterQuery(handler QueryHandler) { - c.queryMu.Lock() - c.queryHandlers = append(c.queryHandlers, handler) - c.queryMu.Unlock() + c.bus.registerQuery(handler) } // RegisterTask adds a task handler to the Core. func (c *Core) RegisterTask(handler TaskHandler) { - c.taskMu.Lock() - c.taskHandlers = append(c.taskHandlers, handler) - c.taskMu.Unlock() + c.bus.registerTask(handler) } // RegisterService adds a new service to the Core. func (c *Core) RegisterService(name string, api any) error { - if c.servicesLocked { - return fmt.Errorf("core: service %q is not permitted by the serviceLock setting", name) - } - if name == "" { - return errors.New("core: service name cannot be empty") - } - c.serviceMu.Lock() - defer c.serviceMu.Unlock() - if _, exists := c.services[name]; exists { - return fmt.Errorf("core: service %q already registered", name) - } - c.services[name] = api - - if s, ok := api.(Startable); ok { - c.startables = append(c.startables, s) - } - if s, ok := api.(Stoppable); ok { - c.stoppables = append(c.stoppables, s) - } - - return nil + return c.svc.registerService(name, api) } // Service retrieves a registered service by name. // It returns nil if the service is not found. func (c *Core) Service(name string) any { - c.serviceMu.RLock() - api, ok := c.services[name] - c.serviceMu.RUnlock() - if !ok { - return nil - } - return api + return c.svc.service(name) } // ServiceFor retrieves a registered service by name and asserts its type to the given interface T. diff --git a/pkg/framework/core/interfaces.go b/pkg/framework/core/interfaces.go index 069d8368..0bef944b 100644 --- a/pkg/framework/core/interfaces.go +++ b/pkg/framework/core/interfaces.go @@ -3,7 +3,6 @@ package core import ( "context" "embed" - "sync" ) // This file defines the public API contracts (interfaces) for the services @@ -73,28 +72,13 @@ type Stoppable interface { // Core is the central application object that manages services, assets, and communication. type Core struct { - App any // GUI runtime (e.g., Wails App) - set by WithApp option - assets embed.FS - Features *Features - serviceLock bool - ipcMu sync.RWMutex - ipcHandlers []func(*Core, Message) error - queryMu sync.RWMutex - queryHandlers []QueryHandler - taskMu sync.RWMutex - taskHandlers []TaskHandler - serviceMu sync.RWMutex - services map[string]any - servicesLocked bool - startables []Startable - stoppables []Stoppable + App any // GUI runtime (e.g., Wails App) - set by WithApp option + assets embed.FS + Features *Features + svc *serviceManager + bus *messageBus } -var ( - instance *Core - instanceMu sync.RWMutex -) - // Config provides access to application configuration. type Config interface { // Get retrieves a configuration value by key and stores it in the 'out' variable. diff --git a/pkg/framework/core/message_bus.go b/pkg/framework/core/message_bus.go new file mode 100644 index 00000000..c01b3cc6 --- /dev/null +++ b/pkg/framework/core/message_bus.go @@ -0,0 +1,120 @@ +package core + +import ( + "errors" + "fmt" + "sync" +) + +// messageBus owns the IPC action, query, and task dispatch. +// It is an unexported component used internally by Core. +type messageBus struct { + core *Core + + ipcMu sync.RWMutex + ipcHandlers []func(*Core, Message) error + + queryMu sync.RWMutex + queryHandlers []QueryHandler + + taskMu sync.RWMutex + taskHandlers []TaskHandler +} + +// newMessageBus creates an empty message bus bound to the given Core. +func newMessageBus(c *Core) *messageBus { + return &messageBus{core: c} +} + +// action dispatches a message to all registered IPC handlers. +func (b *messageBus) action(msg Message) error { + b.ipcMu.RLock() + handlers := append([]func(*Core, Message) error(nil), b.ipcHandlers...) + b.ipcMu.RUnlock() + + var agg error + for _, h := range handlers { + if err := h(b.core, msg); err != nil { + agg = fmt.Errorf("%w; %v", agg, err) + } + } + return agg +} + +// registerAction adds a single IPC handler. +func (b *messageBus) registerAction(handler func(*Core, Message) error) { + b.ipcMu.Lock() + b.ipcHandlers = append(b.ipcHandlers, handler) + b.ipcMu.Unlock() +} + +// registerActions adds multiple IPC handlers. +func (b *messageBus) registerActions(handlers ...func(*Core, Message) error) { + b.ipcMu.Lock() + b.ipcHandlers = append(b.ipcHandlers, handlers...) + b.ipcMu.Unlock() +} + +// query dispatches a query to handlers until one responds. +func (b *messageBus) query(q Query) (any, bool, error) { + b.queryMu.RLock() + handlers := append([]QueryHandler(nil), b.queryHandlers...) + b.queryMu.RUnlock() + + for _, h := range handlers { + result, handled, err := h(b.core, q) + if handled { + return result, true, err + } + } + return nil, false, nil +} + +// queryAll dispatches a query to all handlers and collects all responses. +func (b *messageBus) queryAll(q Query) ([]any, error) { + b.queryMu.RLock() + handlers := append([]QueryHandler(nil), b.queryHandlers...) + b.queryMu.RUnlock() + + var results []any + var agg error + for _, h := range handlers { + result, handled, err := h(b.core, q) + if err != nil { + agg = errors.Join(agg, err) + } + if handled && result != nil { + results = append(results, result) + } + } + return results, agg +} + +// registerQuery adds a query handler. +func (b *messageBus) registerQuery(handler QueryHandler) { + b.queryMu.Lock() + b.queryHandlers = append(b.queryHandlers, handler) + b.queryMu.Unlock() +} + +// perform dispatches a task to handlers until one executes it. +func (b *messageBus) perform(t Task) (any, bool, error) { + b.taskMu.RLock() + handlers := append([]TaskHandler(nil), b.taskHandlers...) + b.taskMu.RUnlock() + + for _, h := range handlers { + result, handled, err := h(b.core, t) + if handled { + return result, true, err + } + } + return nil, false, nil +} + +// registerTask adds a task handler. +func (b *messageBus) registerTask(handler TaskHandler) { + b.taskMu.Lock() + b.taskHandlers = append(b.taskHandlers, handler) + b.taskMu.Unlock() +} diff --git a/pkg/framework/core/message_bus_test.go b/pkg/framework/core/message_bus_test.go new file mode 100644 index 00000000..d661dc73 --- /dev/null +++ b/pkg/framework/core/message_bus_test.go @@ -0,0 +1,129 @@ +package core + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMessageBus_Action_Good(t *testing.T) { + c, _ := New() + + var received []Message + c.bus.registerAction(func(_ *Core, msg Message) error { + received = append(received, msg) + return nil + }) + c.bus.registerAction(func(_ *Core, msg Message) error { + received = append(received, msg) + return nil + }) + + err := c.bus.action("hello") + assert.NoError(t, err) + assert.Len(t, received, 2) +} + +func TestMessageBus_RegisterAction_Good(t *testing.T) { + c, _ := New() + + var coreRef *Core + c.bus.registerAction(func(core *Core, msg Message) error { + coreRef = core + return nil + }) + + _ = c.bus.action(nil) + assert.Same(t, c, coreRef, "handler should receive the Core reference") +} + +func TestMessageBus_Query_Good(t *testing.T) { + c, _ := New() + + c.bus.registerQuery(func(_ *Core, q Query) (any, bool, error) { + return "first", true, nil + }) + + result, handled, err := c.bus.query(TestQuery{Value: "test"}) + assert.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, "first", result) +} + +func TestMessageBus_QueryAll_Good(t *testing.T) { + c, _ := New() + + c.bus.registerQuery(func(_ *Core, q Query) (any, bool, error) { + return "a", true, nil + }) + c.bus.registerQuery(func(_ *Core, q Query) (any, bool, error) { + return nil, false, nil // skips + }) + c.bus.registerQuery(func(_ *Core, q Query) (any, bool, error) { + return "b", true, nil + }) + + results, err := c.bus.queryAll(TestQuery{}) + assert.NoError(t, err) + assert.Equal(t, []any{"a", "b"}, results) +} + +func TestMessageBus_Perform_Good(t *testing.T) { + c, _ := New() + + c.bus.registerTask(func(_ *Core, t Task) (any, bool, error) { + return "done", true, nil + }) + + result, handled, err := c.bus.perform(TestTask{}) + assert.NoError(t, err) + assert.True(t, handled) + assert.Equal(t, "done", result) +} + +func TestMessageBus_ConcurrentAccess_Good(t *testing.T) { + c, _ := New() + + var wg sync.WaitGroup + const goroutines = 20 + + // Concurrent register + dispatch + for i := 0; i < goroutines; i++ { + wg.Add(2) + go func() { + defer wg.Done() + c.bus.registerAction(func(_ *Core, msg Message) error { return nil }) + }() + go func() { + defer wg.Done() + _ = c.bus.action("ping") + }() + } + + for i := 0; i < goroutines; i++ { + wg.Add(2) + go func() { + defer wg.Done() + c.bus.registerQuery(func(_ *Core, q Query) (any, bool, error) { return nil, false, nil }) + }() + go func() { + defer wg.Done() + _, _ = c.bus.queryAll(TestQuery{}) + }() + } + + for i := 0; i < goroutines; i++ { + wg.Add(2) + go func() { + defer wg.Done() + c.bus.registerTask(func(_ *Core, t Task) (any, bool, error) { return nil, false, nil }) + }() + go func() { + defer wg.Done() + _, _, _ = c.bus.perform(TestTask{}) + }() + } + + wg.Wait() +} diff --git a/pkg/framework/core/service_manager.go b/pkg/framework/core/service_manager.go new file mode 100644 index 00000000..b2996c47 --- /dev/null +++ b/pkg/framework/core/service_manager.go @@ -0,0 +1,90 @@ +package core + +import ( + "fmt" + "sync" +) + +// serviceManager owns the service registry and lifecycle tracking. +// It is an unexported component used internally by Core. +type serviceManager struct { + mu sync.RWMutex + services map[string]any + startables []Startable + stoppables []Stoppable + lockEnabled bool // WithServiceLock was called + locked bool // lock applied after New() completes +} + +// newServiceManager creates an empty service manager. +func newServiceManager() *serviceManager { + return &serviceManager{ + services: make(map[string]any), + } +} + +// registerService adds a named service to the registry. +// It also appends to startables/stoppables if the service implements those interfaces. +func (m *serviceManager) registerService(name string, svc any) error { + if m.locked { + return fmt.Errorf("core: service %q is not permitted by the serviceLock setting", name) + } + if name == "" { + return fmt.Errorf("core: service name cannot be empty") + } + m.mu.Lock() + defer m.mu.Unlock() + if _, exists := m.services[name]; exists { + return fmt.Errorf("core: service %q already registered", name) + } + m.services[name] = svc + + if s, ok := svc.(Startable); ok { + m.startables = append(m.startables, s) + } + if s, ok := svc.(Stoppable); ok { + m.stoppables = append(m.stoppables, s) + } + + return nil +} + +// service retrieves a registered service by name, or nil if not found. +func (m *serviceManager) service(name string) any { + m.mu.RLock() + svc, ok := m.services[name] + m.mu.RUnlock() + if !ok { + return nil + } + return svc +} + +// enableLock marks that the lock should be applied after initialisation. +func (m *serviceManager) enableLock() { + m.lockEnabled = true +} + +// applyLock activates the service lock if it was enabled. +// Called once during New() after all options have been processed. +func (m *serviceManager) applyLock() { + if m.lockEnabled { + m.locked = true + } +} + +// getStartables returns a snapshot copy of the startables slice. +func (m *serviceManager) getStartables() []Startable { + m.mu.RLock() + out := append([]Startable(nil), m.startables...) + m.mu.RUnlock() + return out +} + +// getStoppables returns a snapshot copy of the stoppables slice. +func (m *serviceManager) getStoppables() []Stoppable { + m.mu.RLock() + out := append([]Stoppable(nil), m.stoppables...) + m.mu.RUnlock() + return out +} diff --git a/pkg/framework/core/service_manager_test.go b/pkg/framework/core/service_manager_test.go new file mode 100644 index 00000000..40ada336 --- /dev/null +++ b/pkg/framework/core/service_manager_test.go @@ -0,0 +1,135 @@ +package core + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestServiceManager_RegisterService_Good(t *testing.T) { + m := newServiceManager() + + err := m.registerService("svc1", &MockService{Name: "one"}) + assert.NoError(t, err) + + got := m.service("svc1") + assert.NotNil(t, got) + assert.Equal(t, "one", got.(*MockService).GetName()) +} + +func TestServiceManager_RegisterService_Bad(t *testing.T) { + m := newServiceManager() + + // Empty name + err := m.registerService("", &MockService{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot be empty") + + // Duplicate + err = m.registerService("dup", &MockService{}) + assert.NoError(t, err) + err = m.registerService("dup", &MockService{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "already registered") + + // Locked + m2 := newServiceManager() + m2.enableLock() + m2.applyLock() + err = m2.registerService("late", &MockService{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "serviceLock") +} + +func TestServiceManager_ServiceNotFound_Good(t *testing.T) { + m := newServiceManager() + assert.Nil(t, m.service("nonexistent")) +} + +func TestServiceManager_Startables_Good(t *testing.T) { + m := newServiceManager() + + s1 := &MockStartable{} + s2 := &MockStartable{} + + _ = m.registerService("s1", s1) + _ = m.registerService("s2", s2) + + startables := m.getStartables() + assert.Len(t, startables, 2) + + // Verify order matches registration order + assert.Same(t, s1, startables[0]) + assert.Same(t, s2, startables[1]) + + // Verify it's a copy — mutating the slice doesn't affect internal state + startables[0] = nil + assert.Len(t, m.getStartables(), 2) + assert.NotNil(t, m.getStartables()[0]) +} + +func TestServiceManager_Stoppables_Good(t *testing.T) { + m := newServiceManager() + + s1 := &MockStoppable{} + s2 := &MockStoppable{} + + _ = m.registerService("s1", s1) + _ = m.registerService("s2", s2) + + stoppables := m.getStoppables() + assert.Len(t, stoppables, 2) + + // Stoppables are returned in registration order; Core.ServiceShutdown reverses them + assert.Same(t, s1, stoppables[0]) + assert.Same(t, s2, stoppables[1]) +} + +func TestServiceManager_Lock_Good(t *testing.T) { + m := newServiceManager() + + // Register before lock — should succeed + err := m.registerService("early", &MockService{}) + assert.NoError(t, err) + + // Enable and apply lock + m.enableLock() + m.applyLock() + + // Register after lock — should fail + err = m.registerService("late", &MockService{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "serviceLock") + + // Early service is still accessible + assert.NotNil(t, m.service("early")) +} + +func TestServiceManager_LockNotAppliedWithoutEnable_Good(t *testing.T) { + m := newServiceManager() + m.applyLock() // applyLock without enableLock should be a no-op + + err := m.registerService("svc", &MockService{}) + assert.NoError(t, err) +} + +type mockFullLifecycle struct { + startOrder int + stopOrder int +} + +func (m *mockFullLifecycle) OnStartup(_ context.Context) error { return nil } +func (m *mockFullLifecycle) OnShutdown(_ context.Context) error { return nil } + +func TestServiceManager_LifecycleBoth_Good(t *testing.T) { + m := newServiceManager() + + svc := &mockFullLifecycle{} + err := m.registerService("both", svc) + assert.NoError(t, err) + + // Should appear in both startables and stoppables + assert.Len(t, m.getStartables(), 1) + assert.Len(t, m.getStoppables(), 1) +}