fix(rfc): pass 3 — rewrite Sections 18, 19, 20 to match implementation

Section 18: Removed PERFORM references, ActionDef→Action, TaskDef→Task,
  OnStartup returns Result, removed aspirational patterns (Parallel,
  Conditional, Scheduled), kept what's implemented.

Section 19: Removed old struct definition, Stream returns Result not
  (Stream, error), RemoteAction uses c.RemoteAction() not c.Action(),
  removed stale subsystem map, added Web3 snider.lthn example.

Section 20: Removed migration history (20.1 The Problem, 20.6-20.8),
  kept the API contract. Added 20.6 "What Embeds Registry" reference.

4193 → 1519 lines (64% reduction total).

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-25 16:51:51 +00:00
parent 7069def5b8
commit 377afa0cbe

View file

@ -626,268 +626,139 @@ c.Process().Run(ctx, "git", "log") // executes, returns output
## 18. Action and Task — The Execution Primitives
> Status: Design spec. Replaces the current `ACTION`/`PERFORM` broadcast model
> with named, composable execution units.
An Action is a named, registered callable. A Task is a composed sequence of Actions.
### 18.1 The Concept
The current IPC has three verbs:
- `ACTION(msg)` — broadcast fire-and-forget
- `QUERY(q)` — first responder wins
- `PERFORM(t)` — first executor wins
This works but treats everything as anonymous messages. There's no way to:
- Name a callable and invoke it by name
- Chain callables into flows
- Schedule a callable for later
- Inspect what callables are registered
**Action** is the fix. An Action is a named, registered callable. The atomic unit of work in Core.
### 18.2 core.Action() — The Atomic Unit
### 18.1 Action — The Atomic Unit
```go
// Register a named action
// Register
c.Action("git.log", func(ctx context.Context, opts core.Options) core.Result {
dir := opts.String("dir")
return c.Process().RunIn(ctx, dir, "git", "log", "--oneline", "-20")
return c.Process().RunIn(ctx, dir, "git", "log", "--oneline")
})
// Invoke by name
// Invoke
r := c.Action("git.log").Run(ctx, core.NewOptions(
core.Option{Key: "dir", Value: "/path/to/repo"},
core.Option{Key: "dir", Value: "/repo"},
))
if r.OK {
log := r.Value.(string)
}
// Check if an action exists (permission check)
if c.Action("process.run").Exists() {
// process capability is available
}
// Check capability
c.Action("process.run").Exists() // true if go-process registered
// List all
c.Actions() // []string{"process.run", "agentic.dispatch", ...}
```
`c.Action(name)` is dual-purpose like `c.Service(name)`:
- With a handler arg → registers the action
- Without → returns the action for invocation
`c.Action(name)` is dual-purpose: with handler arg → register; without → return for invocation.
### 18.3 Action Signature
### 18.2 Action Type
```go
// ActionHandler is the function signature for all actions.
type ActionHandler func(context.Context, Options) Result
// ActionDef is a registered action.
type ActionDef struct {
type Action struct {
Name string
Handler ActionHandler
Description string // AX: human + agent readable
Schema Options // declares expected input keys (optional)
Description string
Schema Options // expected input keys
}
```
### 18.4 Where Actions Come From
`Action.Run()` has panic recovery and entitlement checking (Section 21) built in.
Services register their actions during `OnStartup`. This is the same pattern as command registration — services own their capabilities:
### 18.3 Where Actions Come From
Services register during `OnStartup`:
```go
func (s *MyService) OnStartup(ctx context.Context) error {
func (s *MyService) OnStartup(ctx context.Context) core.Result {
c := s.Core()
c.Action("process.run", s.handleRun)
c.Action("process.start", s.handleStart)
c.Action("process.kill", s.handleKill)
c.Action("git.clone", s.handleGitClone)
c.Action("git.push", s.handleGitPush)
return nil
return core.Result{OK: true}
}
```
go-process registers `process.*` actions. core/agent registers `agentic.*` actions. The action namespace IS the capability map.
The action namespace IS the capability map. go-process registers `process.*`, core/agent registers `agentic.*`.
### 18.5 The Permission Model
### 18.4 Permission Model
If `process.run` is not registered, calling it returns `Result{OK: false}`. This is the same "registration IS permission" model from Section 17.7, but generalised to ALL capabilities:
Three states for any action:
| State | `Exists()` | `Entitled()` | `Run()` |
|-------|-----------|-------------|---------|
| Not registered | false | — | `Result{OK: false}` not registered |
| Registered, not entitled | true | false | `Result{OK: false}` not entitled |
| Registered and entitled | true | true | executes handler |
### 18.5 Task — Composing Actions
```go
// Full Core — everything available
c := core.New(
core.WithService(process.Register), // registers process.* actions
core.WithService(agentic.Register), // registers agentic.* actions
core.WithService(brain.Register), // registers brain.* actions
)
// Sandboxed Core — no process, no brain
c := core.New(
core.WithService(agentic.Register), // only agentic.* actions
)
// c.Action("process.run").Run(...) → Result{OK: false}
// c.Action("brain.recall").Run(...) → Result{OK: false}
```
### 18.6 core.Task() — Composing Actions
A Task is a named sequence, chain, or graph of Actions. Think n8n nodes but in code.
```go
// Sequential chain — stops on first failure
c.Task("deploy", core.TaskDef{
Description: "Build, test, and deploy to production",
Steps: []core.Step{
{Action: "go.build", With: core.Options{...}},
{Action: "go.test", With: core.Options{...}},
{Action: "docker.push", With: core.Options{...}},
{Action: "ansible.deploy", With: core.Options{...}},
},
})
// Run the task
r := c.Task("deploy").Run(ctx, core.NewOptions(
core.Option{Key: "target", Value: "production"},
))
```
### 18.7 Task Composition Patterns
```go
// Chain — sequential, output of each feeds next
c.Task("review-pipeline", core.TaskDef{
Steps: []core.Step{
{Action: "agentic.dispatch", With: opts},
{Action: "agentic.verify", Input: "previous"}, // gets output of dispatch
{Action: "agentic.merge", Input: "previous"},
},
})
// Parallel — all run concurrently, wait for all
c.Task("multi-repo-sweep", core.TaskDef{
Parallel: []core.Step{
{Action: "agentic.dispatch", With: optsGoIO},
{Action: "agentic.dispatch", With: optsGoLog},
{Action: "agentic.dispatch", With: optsGoMCP},
},
})
// Conditional — branch on result
c.Task("qa-gate", core.TaskDef{
c.Task("deploy", core.Task{
Description: "Build, test, deploy",
Steps: []core.Step{
{Action: "go.build"},
{Action: "go.test"},
{
If: "previous.OK",
Then: core.Step{Action: "agentic.merge"},
Else: core.Step{Action: "agentic.flag-review"},
},
{Action: "docker.push"},
{Action: "ansible.deploy", Async: true}, // doesn't block
},
})
// Scheduled — run at a specific time or interval
c.Task("nightly-sweep", core.TaskDef{
Schedule: "0 2 * * *", // cron: 2am daily
Steps: []core.Step{
{Action: "agentic.scan"},
{Action: "agentic.dispatch-fixes", Input: "previous"},
},
})
r := c.Task("deploy").Run(ctx, c, opts)
```
### 18.8 How This Relates to Existing IPC
Sequential steps stop on first failure. `Async: true` steps fire without blocking.
`Input: "previous"` pipes last step's output to next step.
The current IPC verbs become invocation modes for Actions:
| Current | Becomes | Purpose |
|---------|---------|---------|
| `c.ACTION(msg)` | `c.Action("name").Broadcast(opts)` | Fire-and-forget to ALL handlers |
| `c.QUERY(q)` | `c.Action("name").Query(opts)` | First responder wins |
| `c.PERFORM(t)` | `c.Action("name").Run(opts)` | Execute and return result |
| `c.PerformAsync(t)` | `c.Action("name").RunAsync(opts)` | Background with progress |
The anonymous message types (`Message`, `Query`, `Task`) still work for backwards compatibility. Named Actions are the AX-native way forward.
### 18.9 How Process Fits
Section 17's `c.Process()` is syntactic sugar over Actions:
### 18.6 Background Execution
```go
// c.Process().Run(ctx, "git", "log") is equivalent to:
r := c.PerformAsync("agentic.dispatch", opts)
taskID := r.Value.(string)
// Broadcasts ActionTaskStarted, ActionTaskProgress, ActionTaskCompleted
c.Progress(taskID, 0.5, "halfway", "agentic.dispatch")
```
### 18.7 How Process Fits
`c.Process()` is sugar over Actions:
```go
c.Process().Run(ctx, "git", "log")
// equivalent to:
c.Action("process.run").Run(ctx, core.NewOptions(
core.Option{Key: "command", Value: "git"},
core.Option{Key: "args", Value: []string{"log"}},
))
// c.Process().Start(ctx, opts) is equivalent to:
c.Action("process.start").Run(ctx, core.NewOptions(
core.Option{Key: "command", Value: opts.Command},
core.Option{Key: "args", Value: opts.Args},
core.Option{Key: "detach", Value: true},
))
```
The `Process` primitive is a typed convenience layer. Under the hood, it's Actions all the way down.
### 18.10 Inspecting the Action Registry
```go
// List all registered actions
actions := c.Actions() // []string{"process.run", "process.start", "agentic.dispatch", ...}
// Check capabilities
c.Action("process.run").Exists() // true if go-process registered
c.Action("brain.recall").Exists() // true if brain registered
// Get action metadata
def := c.Action("agentic.dispatch").Def()
// def.Description = "Dispatch a subagent to work on a task"
// def.Schema = Options with expected keys
```
This makes the capability map queryable. An agent can inspect what Actions are available before attempting to use them.
---
## 19. API — Remote Streams
> Status: Design spec. The transport primitive for remote communication.
### 19.1 The Concept
HTTP is a stream. WebSocket is a stream. SSE is a stream. MCP over HTTP is a stream. The transport protocol is irrelevant to the consumer — you write bytes, you read bytes.
```
c.IPC() → local conclave (in-process, same binary)
c.API() → remote streams (cross-process, cross-machine)
c.Process() → managed execution (via IPC Actions)
```
IPC is local. API is remote. The consumer doesn't care which one resolves their Action — if `process.run` is local it goes through IPC, if it's on Charon it goes through API. Same Action name, same Result type.
### 19.2 The Primitive
Drive is the phone book (WHERE). API is the phone (HOW). Consumer packages register protocol handlers.
```go
// API is the Core primitive for remote communication.
// All remote transports are streams — the protocol is a detail.
type API struct {
core *Core
streams map[string]*Stream
// Configure endpoint in Drive
c.Drive().New(core.NewOptions(
core.Option{Key: "name", Value: "charon"},
core.Option{Key: "transport", Value: "http://10.69.69.165:9101/mcp"},
))
// Open stream — looks up Drive, finds protocol handler
r := c.API().Stream("charon")
if r.OK {
stream := r.Value.(core.Stream)
stream.Send(payload)
resp, _ := stream.Receive()
stream.Close()
}
// Accessor on Core
func (c *Core) API() *API { return c.api }
```
### 19.3 Streams
A Stream is a named, bidirectional connection to a remote endpoint. How it connects (HTTP, WebSocket, TCP, unix socket) is configured in `c.Drive()`.
### 19.1 Stream Interface
```go
// Open a stream to a named endpoint
s, err := c.API().Stream("charon")
// → looks up "charon" in c.Drive()
// → Drive has: transport="http://10.69.69.165:9101/mcp"
// → API opens HTTP connection, returns Stream
// Stream interface
type Stream interface {
Send(data []byte) error
Receive() ([]byte, error)
@ -895,126 +766,51 @@ type Stream interface {
}
```
### 19.4 Relationship to Drive
### 19.2 Protocol Handlers
`c.Drive()` holds the connection config. `c.API()` opens the actual streams.
Consumer packages register factories per URL scheme:
```go
// Drive holds WHERE to connect
c.Drive().New(core.NewOptions(
core.Option{Key: "name", Value: "charon"},
core.Option{Key: "transport", Value: "http://10.69.69.165:9101/mcp"},
core.Option{Key: "token", Value: agentToken},
))
// API handles HOW to connect
s, _ := c.API().Stream("charon") // reads config from Drive("charon")
s.Send(payload) // HTTP POST under the hood
resp, _ := s.Receive() // SSE/response parsing under the hood
// In a transport package's OnStartup:
c.API().RegisterProtocol("http", httpStreamFactory)
c.API().RegisterProtocol("mcp", mcpStreamFactory)
```
Drive is the phone book. API is the phone.
Resolution: `c.API().Stream("charon")` → Drive lookup → extract scheme → find factory → create Stream.
### 19.5 Protocol Handlers
No protocol handler = no capability.
Different transports register as protocol handlers — same pattern as Actions:
### 19.3 Remote Action Dispatch
Actions transparently cross machine boundaries via `host:action` syntax:
```go
// In a hypothetical core/http package:
func Register(c *core.Core) core.Result {
c.API().RegisterProtocol("http", httpStreamFactory)
c.API().RegisterProtocol("https", httpStreamFactory)
return core.Result{OK: true}
}
// Local
r := c.RemoteAction("agentic.status", ctx, opts)
// In core/mcp:
func Register(c *core.Core) core.Result {
c.API().RegisterProtocol("mcp", mcpStreamFactory)
return core.Result{OK: true}
}
// Remote — same API, different host
r := c.RemoteAction("charon:agentic.status", ctx, opts)
// → splits on ":" → endpoint="charon", action="agentic.status"
// → c.API().Call("charon", "agentic.status", opts)
// Web3 — Lethean dVPN routed
r := c.RemoteAction("snider.lthn:brain.recall", ctx, opts)
```
When `c.API().Stream("charon")` is called:
1. Look up "charon" in Drive → get transport URL
2. Parse protocol from URL → "http"
3. Find registered protocol handler → httpStreamFactory
4. Factory creates the Stream
No protocol handler = no capability. Same permission model as Process and Actions.
### 19.6 Remote Action Dispatch
The killer feature: Actions that transparently cross machine boundaries.
### 19.4 Direct Call
```go
// Local action — goes through IPC
c.Action("agentic.status").Run(ctx, opts)
// Remote action — goes through API
c.Action("charon:agentic.status").Run(ctx, opts)
// → splits on ":" → host="charon", action="agentic.status"
// → c.API().Stream("charon") → sends JSON-RPC call
// → remote core-agent handles it → result comes back
r := c.API().Call("charon", "agentic.dispatch", opts)
// Opens stream, sends JSON-RPC, receives response, closes stream
```
The current `dispatchRemote` function in core/agent does exactly this manually — builds MCP JSON-RPC, opens HTTP, parses SSE. With `c.API()`, it becomes one line.
### 19.7 Where This Already Exists (Partially)
The pieces are scattered across the ecosystem:
| Current | Becomes |
|---------|---------|
| `dispatchRemote` in core/agent — manual HTTP + SSE + MCP | `c.Action("charon:agentic.dispatch").Run(opts)` |
| `statusRemote` in core/agent — same manual HTTP | `c.Action("charon:agentic.status").Run(opts)` |
| `mcpInitialize` / `mcpCall` in core/agent — MCP handshake | `c.API().Stream("charon")` (MCP protocol handler) |
| `brainRecall` in core/agent — HTTP POST to brain API | `c.Action("brain.recall").Run(opts)` or `c.API().Stream("brain")` |
| Forge API calls — custom HTTP client | `c.API().Stream("forge")` |
| `DriveHandle.Transport` — stores URLs | `c.Drive()` already does this — API reads from it |
### 19.8 The Full Subsystem Map
```
c.Registry() — universal named collection (the brick all registries use)
c.Options() — input configuration (what was passed to New)
c.App() — identity (name, version)
c.Config() — runtime settings
c.Data() — embedded assets
c.Drive() — connection config (WHERE to reach things)
c.API() — remote streams (HOW to reach things)
c.Fs() — filesystem
c.Process() — managed execution
c.Action() — named callables (register, invoke, inspect)
c.IPC() — local message bus (consumes Action registry)
c.Cli() — command tree
c.Log() — logging
c.Error() — panic recovery
c.I18n() — internationalisation
```
14 subsystems. `c.Registry()` is the foundation — most other subsystems build on it.
---
## 20. Registry — The Universal Collection Primitive
> Status: Design spec. Extracts the pattern shared by 5+ existing registries.
Thread-safe named collection. The brick all registries build on.
### 20.1 The Problem
Core has multiple independent registry implementations that all do the same thing:
```
serviceRegistry — map[string]*Service + mutex + locked
commandRegistry — map[string]*Command + mutex
Ipc handlers — []func + mutex
Drive — map[string]*DriveHandle + mutex
Data — map[string]*Embed
```
Five registries, five implementations of: named map + thread safety + optional locking.
### 20.2 The Primitive
### 20.1 The Type
```go
// Registry is a thread-safe named collection. The universal brick
@ -1082,53 +878,17 @@ c.Registry("actions").Get("process.run") // universal access
The typed accessors stay — they're ergonomic and type-safe. `c.Registry()` adds the universal query layer on top.
### 20.6 Why This Matters for IPC
### 20.6 What Embeds Registry
This resolves Issue 6 (serviceRegistry unexported) and Issue 12 (Ipc data-only struct) cleanly:
All named collections in Core embed `Registry[T]`:
**IPC is safe to expose Actions and Handlers** because it doesn't control the write path. The Registry does:
```go
// IPC reads from the registry (safe, read-only)
c.IPC().Actions() // reads c.Registry("actions").Names()
c.IPC().Handlers() // reads c.Registry("handlers").Len()
// Registration goes through the primitive (controlled)
c.Action("process.run", handler) // writes to c.Registry("actions")
// Locking goes through the primitive
c.Registry("actions").Lock() // no more registration after startup
```
IPC is a consumer of the registry, not the owner of the data. The Registry primitive owns the data. This is the separation that makes it safe to export everything.
### 20.7 ServiceRegistry and CommandRegistry Become Exported
With `Registry[T]` as the brick:
```go
type ServiceRegistry struct {
*Registry[*Service]
}
type CommandRegistry struct {
*Registry[*Command]
}
```
These are now exported types — consumers can extend service management and command routing. But they can't bypass the lock because `Registry.Set()` checks `locked`. The primitive enforces the contract.
### 20.8 What This Replaces
| Current | Becomes |
|---------|---------|
| `serviceRegistry` (unexported, custom map+mutex) | `ServiceRegistry` embedding `Registry[*Service]` |
| `commandRegistry` (unexported, custom map+mutex) | `CommandRegistry` embedding `Registry[*Command]` |
| `Drive.handles` (internal map+mutex) | `Drive` embedding `Registry[*DriveHandle]` |
| `Data.mounts` (internal map) | `Data` embedding `Registry[*Embed]` |
| `Ipc.ipcHandlers` (internal slice+mutex) | `Registry[ActionHandler]` in IPC |
| 5 separate lock implementations | One `Registry.Lock()` / `Registry.Locked()` |
| `WithServiceLock()` | `c.Registry("services").Lock()` |
- `ServiceRegistry``Registry[*Service]`
- `CommandRegistry``Registry[*Command]`
- `Drive``Registry[*DriveHandle]`
- `Data``Registry[*Embed]`
- `Lock.locks``Registry[*sync.RWMutex]`
- `IPC.actions``Registry[*Action]`
- `IPC.tasks``Registry[*Task]`
---