Plans 1-5 complete for core/go scope. 456 tests, 84.4% coverage, 100% AX-7 naming.
Critical bugs (Plan 1):
- P4-3+P7-3: ACTION broadcast calls all handlers with panic recovery
- P7-2+P7-4: RunE() with defer ServiceShutdown, Run() delegates
- P3-1: Startable/Stoppable return Result (breaking, clean)
- P9-1: Zero os/exec — App.Find() rewritten with os.Stat+PATH
- I3: Embed() removed, I15: New() comment fixed
- I9: CommandLifecycle removed → Command.Managed field
Registry[T] (Plan 2):
- Universal thread-safe named collection with 3 lock modes
- All 5 registries migrated: services, commands, drive, data, lock
- Insertion order preserved (fixes P4-1)
- c.RegistryOf("name") cross-cutting accessor
Action/Task system (Plan 3):
- Action type with Run()/Exists(), ActionHandler signature
- c.Action("name") dual-purpose accessor (register/invoke)
- TaskDef with Steps — sequential chain, async dispatch, previous-input piping
- Panic recovery on all Action execution
- broadcast() internal, ACTION() sugar
Process primitive (Plan 4):
- c.Process() returns Action sugar — Run/RunIn/RunWithEnv/Start/Kill/Exists
- No deps added — delegates to c.Action("process.*")
- Permission-by-registration: no handler = no capability
Missing primitives (Plan 5):
- core.ID() — atomic counter + crypto/rand suffix
- ValidateName() / SanitisePath() — reusable validation
- Fs.WriteAtomic() — write-to-temp-then-rename
- Fs.NewUnrestricted() / Fs.Root() — legitimate sandbox bypass
- AX-7: 456/456 tests renamed to TestFile_Function_{Good,Bad,Ugly}
Co-Authored-By: Virgil <virgil@lethean.io>
112 lines
2.7 KiB
Go
112 lines
2.7 KiB
Go
// SPDX-License-Identifier: EUPL-1.2
|
|
|
|
// Message bus for the Core framework.
|
|
// Dispatches actions (fire-and-forget), queries (first responder),
|
|
// and tasks (first executor) between registered handlers.
|
|
|
|
package core
|
|
|
|
import (
|
|
"slices"
|
|
"sync"
|
|
)
|
|
|
|
// Ipc holds IPC dispatch data and the named action registry.
|
|
//
|
|
// ipc := (&core.Ipc{}).New()
|
|
type Ipc struct {
|
|
ipcMu sync.RWMutex
|
|
ipcHandlers []func(*Core, Message) Result
|
|
|
|
queryMu sync.RWMutex
|
|
queryHandlers []QueryHandler
|
|
|
|
taskMu sync.RWMutex
|
|
taskHandlers []TaskHandler
|
|
|
|
actions *Registry[*Action] // named action registry
|
|
tasks *Registry[*TaskDef] // named task registry
|
|
}
|
|
|
|
// broadcast dispatches a message to all registered IPC handlers.
|
|
// Each handler is wrapped in panic recovery. All handlers fire regardless of individual results.
|
|
func (c *Core) broadcast(msg Message) Result {
|
|
c.ipc.ipcMu.RLock()
|
|
handlers := slices.Clone(c.ipc.ipcHandlers)
|
|
c.ipc.ipcMu.RUnlock()
|
|
|
|
for _, h := range handlers {
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
Error("ACTION handler panicked", "panic", r)
|
|
}
|
|
}()
|
|
h(c, msg)
|
|
}()
|
|
}
|
|
return Result{OK: true}
|
|
}
|
|
|
|
func (c *Core) Query(q Query) Result {
|
|
c.ipc.queryMu.RLock()
|
|
handlers := slices.Clone(c.ipc.queryHandlers)
|
|
c.ipc.queryMu.RUnlock()
|
|
|
|
for _, h := range handlers {
|
|
r := h(c, q)
|
|
if r.OK {
|
|
return r
|
|
}
|
|
}
|
|
return Result{}
|
|
}
|
|
|
|
func (c *Core) QueryAll(q Query) Result {
|
|
c.ipc.queryMu.RLock()
|
|
handlers := slices.Clone(c.ipc.queryHandlers)
|
|
c.ipc.queryMu.RUnlock()
|
|
|
|
var results []any
|
|
for _, h := range handlers {
|
|
r := h(c, q)
|
|
if r.OK && r.Value != nil {
|
|
results = append(results, r.Value)
|
|
}
|
|
}
|
|
return Result{results, true}
|
|
}
|
|
|
|
func (c *Core) RegisterQuery(handler QueryHandler) {
|
|
c.ipc.queryMu.Lock()
|
|
c.ipc.queryHandlers = append(c.ipc.queryHandlers, handler)
|
|
c.ipc.queryMu.Unlock()
|
|
}
|
|
|
|
// --- IPC Registration (handlers) ---
|
|
|
|
// RegisterAction registers a broadcast handler for ACTION messages.
|
|
//
|
|
// c.RegisterAction(func(c *core.Core, msg core.Message) core.Result {
|
|
// if ev, ok := msg.(AgentCompleted); ok { ... }
|
|
// return core.Result{OK: true}
|
|
// })
|
|
func (c *Core) RegisterAction(handler func(*Core, Message) Result) {
|
|
c.ipc.ipcMu.Lock()
|
|
c.ipc.ipcHandlers = append(c.ipc.ipcHandlers, handler)
|
|
c.ipc.ipcMu.Unlock()
|
|
}
|
|
|
|
// RegisterActions registers multiple broadcast handlers.
|
|
func (c *Core) RegisterActions(handlers ...func(*Core, Message) Result) {
|
|
c.ipc.ipcMu.Lock()
|
|
c.ipc.ipcHandlers = append(c.ipc.ipcHandlers, handlers...)
|
|
c.ipc.ipcMu.Unlock()
|
|
}
|
|
|
|
// RegisterTask registers a handler for PERFORM task dispatch.
|
|
func (c *Core) RegisterTask(handler TaskHandler) {
|
|
c.ipc.taskMu.Lock()
|
|
c.ipc.taskHandlers = append(c.ipc.taskHandlers, handler)
|
|
c.ipc.taskMu.Unlock()
|
|
}
|