go/docs/messaging.md
Snider 89d189dd95 docs: add human-friendly documentation for Core Go framework
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 13:02:37 +00:00

286 lines
7.3 KiB
Markdown

---
title: Messaging
description: ACTION, QUERY, and PERFORM -- the message bus for decoupled service communication.
---
# Messaging
The message bus enables services to communicate without importing each other. It supports three patterns:
| Pattern | Method | Semantics |
|---------|--------|-----------|
| **ACTION** | `c.ACTION(msg)` | Broadcast to all handlers (fire-and-forget) |
| **QUERY** | `c.QUERY(q)` | First responder wins (read-only) |
| **PERFORM** | `c.PERFORM(t)` | First responder executes (side effects) |
All three are type-safe at the handler level through Go type switches, while the bus itself uses `any` to avoid import cycles.
## Message Types
```go
// Any struct can be a message -- no interface to implement.
type Message any // Used with ACTION
type Query any // Used with QUERY / QUERYALL
type Task any // Used with PERFORM / PerformAsync
```
Define your message types as plain structs:
```go
// In your package
type UserCreated struct {
UserID string
Email string
}
type GetUserCount struct{}
type SendEmail struct {
To string
Subject string
Body string
}
```
## ACTION -- Broadcast
`ACTION` dispatches a message to **every** registered action handler. Handlers receive the message and can inspect it via type switch. All handlers are called regardless of whether they handle the specific message type.
### Dispatching
```go
err := c.ACTION(UserCreated{UserID: "123", Email: "user@example.com"})
```
Errors from all handlers are aggregated with `errors.Join`. If no handlers are registered, `ACTION` returns `nil`.
### Handling
```go
c.RegisterAction(func(c *core.Core, msg core.Message) error {
switch m := msg.(type) {
case UserCreated:
fmt.Printf("New user: %s (%s)\n", m.UserID, m.Email)
}
return nil
})
```
You can register multiple handlers. Each handler receives every message -- use a type switch to filter.
```go
// Register multiple handlers at once
c.RegisterActions(handler1, handler2, handler3)
```
### Auto-Discovery
If a service registered via `WithService` has a method called `HandleIPCEvents` with the signature `func(*Core, Message) error`, it is automatically registered as an action handler:
```go
type Service struct{}
func (s *Service) HandleIPCEvents(c *core.Core, msg core.Message) error {
switch msg.(type) {
case UserCreated:
// react to event
}
return nil
}
```
## QUERY -- Request/Response
`QUERY` dispatches a query to handlers in registration order. The **first** handler that returns `handled == true` wins -- subsequent handlers are not called.
### Dispatching
```go
result, handled, err := c.QUERY(GetUserCount{})
if !handled {
// no handler recognised this query
}
count := result.(int)
```
### Handling
```go
c.RegisterQuery(func(c *core.Core, q core.Query) (any, bool, error) {
switch q.(type) {
case GetUserCount:
return 42, true, nil
}
return nil, false, nil // not handled -- pass to next handler
})
```
Return `false` for `handled` to let the query fall through to the next handler.
### QUERYALL -- Collect All Responses
`QUERYALL` calls **every** handler and collects all responses where `handled == true`:
```go
results, err := c.QUERYALL(GetPluginInfo{})
// results contains one entry per handler that responded
```
Errors from all handlers are aggregated. Results from handlers that returned `handled == false` or `result == nil` are excluded.
## PERFORM -- Execute a Task
`PERFORM` dispatches a task to handlers in registration order. Like `QUERY`, the first handler that returns `handled == true` wins.
### Dispatching
```go
result, handled, err := c.PERFORM(SendEmail{
To: "user@example.com",
Subject: "Welcome",
Body: "Hello!",
})
if !handled {
// no handler could execute this task
}
```
### Handling
```go
c.RegisterTask(func(c *core.Core, t core.Task) (any, bool, error) {
switch m := t.(type) {
case SendEmail:
err := sendMail(m.To, m.Subject, m.Body)
return nil, true, err
}
return nil, false, nil
})
```
## PerformAsync -- Background Tasks
`PerformAsync` dispatches a task to be executed in a background goroutine. It returns a task ID immediately.
```go
taskID := c.PerformAsync(SendEmail{
To: "user@example.com",
Subject: "Report",
Body: "...",
})
// taskID is something like "task-1"
```
The lifecycle of an async task produces three action messages:
| Message | When |
|---------|------|
| `ActionTaskStarted{TaskID, Task}` | Immediately, before execution begins |
| `ActionTaskProgress{TaskID, Task, Progress, Message}` | When `c.Progress()` is called |
| `ActionTaskCompleted{TaskID, Task, Result, Error}` | After execution finishes |
### Listening for Completion
```go
c.RegisterAction(func(c *core.Core, msg core.Message) error {
switch m := msg.(type) {
case core.ActionTaskCompleted:
fmt.Printf("Task %s finished: result=%v err=%v\n",
m.TaskID, m.Result, m.Error)
}
return nil
})
```
### Reporting Progress
From within a task handler (or anywhere that has the task ID):
```go
c.Progress(taskID, 0.5, "halfway done", myTask)
```
This broadcasts an `ActionTaskProgress` message.
### TaskWithID
If your task struct implements `TaskWithID`, `PerformAsync` will inject the assigned task ID before dispatching:
```go
type TaskWithID interface {
Task
SetTaskID(id string)
GetTaskID() string
}
```
```go
type MyLongTask struct {
id string
}
func (t *MyLongTask) SetTaskID(id string) { t.id = id }
func (t *MyLongTask) GetTaskID() string { return t.id }
```
### Shutdown Behaviour
- `PerformAsync` returns an empty string if the Core is already shut down.
- `ServiceShutdown` waits for all in-flight async tasks to complete (respecting the context deadline).
## Real-World Example: Log Service
The `pkg/log` service demonstrates both query and task handling:
```go
// Query type: "what is the current log level?"
type QueryLevel struct{}
// Task type: "change the log level"
type TaskSetLevel struct {
Level Level
}
func (s *Service) OnStartup(ctx context.Context) error {
s.Core().RegisterQuery(s.handleQuery)
s.Core().RegisterTask(s.handleTask)
return nil
}
func (s *Service) handleQuery(c *core.Core, q core.Query) (any, bool, error) {
switch q.(type) {
case QueryLevel:
return s.Level(), true, nil
}
return nil, false, nil
}
func (s *Service) handleTask(c *core.Core, t core.Task) (any, bool, error) {
switch m := t.(type) {
case TaskSetLevel:
s.SetLevel(m.Level)
return nil, true, nil
}
return nil, false, nil
}
```
Other services can query or change the log level without importing the log package:
```go
// Query the level
level, handled, _ := c.QUERY(log.QueryLevel{})
// Change the level
_, _, _ = c.PERFORM(log.TaskSetLevel{Level: log.LevelDebug})
```
## Thread Safety
The message bus uses `sync.RWMutex` for each handler list (actions, queries, tasks). Registration and dispatch are safe to call concurrently from multiple goroutines. Handler lists are snapshot-cloned before dispatch, so handlers registered during dispatch will not be called until the next dispatch.
## Related Pages
- [Services](services.md) -- how services are registered
- [Lifecycle](lifecycle.md) -- `ActionServiceStartup` and `ActionServiceShutdown` messages
- [Testing](testing.md) -- testing message handlers