113 lines
2.8 KiB
Go
113 lines
2.8 KiB
Go
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
// Message bus for the Core framework.
|
|
// Dispatches actions (fire-and-forget), queries (first responder),
|
|
// and tasks (first executor) between registered handlers.
|
|
|
|
package core
|
|
|
|
import (
|
|
"slices"
|
|
"sync"
|
|
)
|
|
|
|
// Ipc holds IPC dispatch data and the named action registry.
|
|
//
|
|
// ipc := (&core.Ipc{}).New()
|
|
type Ipc struct {
|
|
ipcMu sync.RWMutex
|
|
ipcHandlers []func(*Core, Message) Result
|
|
|
|
queryMu sync.RWMutex
|
|
queryHandlers []QueryHandler
|
|
|
|
actions *Registry[*Action] // named action registry
|
|
tasks *Registry[*Task] // named task registry
|
|
}
|
|
|
|
// broadcast dispatches a message to all registered IPC handlers.
|
|
// Each handler is wrapped in panic recovery. All handlers fire regardless of individual results.
|
|
func (c *Core) broadcast(msg Message) Result {
|
|
c.ipc.ipcMu.RLock()
|
|
handlers := slices.Clone(c.ipc.ipcHandlers)
|
|
c.ipc.ipcMu.RUnlock()
|
|
|
|
for _, h := range handlers {
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
Error("ACTION handler panicked", "panic", r)
|
|
}
|
|
}()
|
|
h(c, msg)
|
|
}()
|
|
}
|
|
return Result{OK: true}
|
|
}
|
|
|
|
// Query dispatches a request — first handler to return OK wins.
|
|
//
|
|
// r := c.Query(MyQuery{})
|
|
func (c *Core) Query(q Query) Result {
|
|
c.ipc.queryMu.RLock()
|
|
handlers := slices.Clone(c.ipc.queryHandlers)
|
|
c.ipc.queryMu.RUnlock()
|
|
|
|
for _, h := range handlers {
|
|
r := h(c, q)
|
|
if r.OK {
|
|
return r
|
|
}
|
|
}
|
|
return Result{}
|
|
}
|
|
|
|
// QueryAll dispatches a request — collects all OK responses.
|
|
//
|
|
// r := c.QueryAll(countQuery{})
|
|
// results := r.Value.([]any)
|
|
func (c *Core) QueryAll(q Query) Result {
|
|
c.ipc.queryMu.RLock()
|
|
handlers := slices.Clone(c.ipc.queryHandlers)
|
|
c.ipc.queryMu.RUnlock()
|
|
|
|
var results []any
|
|
for _, h := range handlers {
|
|
r := h(c, q)
|
|
if r.OK && r.Value != nil {
|
|
results = append(results, r.Value)
|
|
}
|
|
}
|
|
return Result{results, true}
|
|
}
|
|
|
|
// RegisterQuery registers a handler for QUERY dispatch.
|
|
//
|
|
// c.RegisterQuery(func(_ *core.Core, q core.Query) core.Result { ... })
|
|
func (c *Core) RegisterQuery(handler QueryHandler) {
|
|
c.ipc.queryMu.Lock()
|
|
c.ipc.queryHandlers = append(c.ipc.queryHandlers, handler)
|
|
c.ipc.queryMu.Unlock()
|
|
}
|
|
|
|
// --- IPC Registration (handlers) ---
|
|
|
|
// RegisterAction registers a broadcast handler for ACTION messages.
|
|
//
|
|
// c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
|
|
// if ev, ok := msg.(AgentCompleted); ok { ... }
|
|
// return core.Result{OK: true}
|
|
// })
|
|
func (c *Core) RegisterAction(handler func(*Core, Message) Result) {
|
|
c.ipc.ipcMu.Lock()
|
|
c.ipc.ipcHandlers = append(c.ipc.ipcHandlers, handler)
|
|
c.ipc.ipcMu.Unlock()
|
|
}
|
|
|
|
// RegisterActions registers multiple broadcast handlers.
|
|
func (c *Core) RegisterActions(handlers ...func(*Core, Message) Result) {
|
|
c.ipc.ipcMu.Lock()
|
|
c.ipc.ipcHandlers = append(c.ipc.ipcHandlers, handlers...)
|
|
c.ipc.ipcMu.Unlock()
|
|
}
|
|
|