1 Message-Bus
Virgil edited this page 2026-02-19 17:29:53 +00:00

Message Bus

The Core message bus provides three communication patterns for inter-service messaging: ACTION (fire-and-forget broadcasts), QUERY (read-only first-responder), and TASK (side-effects first-responder).

ACTION (Broadcast)

Fire-and-forget messages delivered to all registered handlers:

// Register handler
core.RegisterAction(func(c *core.Core, msg core.Message) error {
    switch m := msg.(type) {
    case UserCreated:
        log.Printf("User created: %s", m.Email)
    }
    return nil
})

// Broadcast
core.ACTION(UserCreated{UserID: 42, Email: "user@example.com"})

Built-in action messages:

  • ActionServiceStartup -- services starting
  • ActionServiceShutdown -- services stopping
  • ActionTaskStarted -- background task started
  • ActionTaskProgress -- progress update (0.0--1.0)
  • ActionTaskCompleted -- task finished with result/error

Services with a HandleIPCEvents(c *Core, msg Message) error method are auto-registered as action handlers during WithService().

QUERY (Read-Only, First-Responder)

Queries stop at the first handler that returns handled=true:

// Register handler
core.RegisterQuery(func(c *core.Core, q core.Query) (any, bool, error) {
    if qu, ok := q.(QueryUserByID); ok {
        user := findUser(qu.ID)
        return user, true, nil   // handled=true stops the chain
    }
    return nil, false, nil       // handled=false continues
})

// Execute
user, handled, err := core.QUERY(QueryUserByID{ID: 42})

// Query all handlers (collects all responses)
results, err := core.QUERYALL(QueryUserByID{ID: 42})

TASK (Side-Effects, First-Responder)

Like QUERY but intended for operations with side effects:

// Register handler
core.RegisterTask(func(c *core.Core, t core.Task) (any, bool, error) {
    if ct, ok := t.(CreateFileTask); ok {
        err := os.WriteFile(ct.Path, []byte(ct.Content), 0644)
        return ct.Path, true, err
    }
    return nil, false, nil
})

// Synchronous
result, handled, err := core.PERFORM(CreateFileTask{Path: "/tmp/file.txt", Content: "Hello"})

// Asynchronous (returns task ID immediately)
taskID := core.PerformAsync(CreateFileTask{...})

// Report progress on long tasks
core.Progress(taskID, 0.5, "Processing...", task)

Pattern Summary

Pattern Delivery Return Use Case
ACTION All handlers error only Events, notifications
QUERY First responder value + handled + error Data lookups
TASK First responder value + handled + error Operations with side effects

See Service-Lifecycle for service registration and Service-Runtime for embedding Core access in services.