diff --git a/docs/RFC.md b/docs/RFC.md index e09fd5e..58d2de9 100644 --- a/docs/RFC.md +++ b/docs/RFC.md @@ -626,268 +626,139 @@ c.Process().Run(ctx, "git", "log") // executes, returns output ## 18. Action and Task — The Execution Primitives -> Status: Design spec. Replaces the current `ACTION`/`PERFORM` broadcast model -> with named, composable execution units. +An Action is a named, registered callable. A Task is a composed sequence of Actions. -### 18.1 The Concept - -The current IPC has three verbs: -- `ACTION(msg)` — broadcast fire-and-forget -- `QUERY(q)` — first responder wins -- `PERFORM(t)` — first executor wins - -This works but treats everything as anonymous messages. There's no way to: -- Name a callable and invoke it by name -- Chain callables into flows -- Schedule a callable for later -- Inspect what callables are registered - -**Action** is the fix. An Action is a named, registered callable. The atomic unit of work in Core. - -### 18.2 core.Action() — The Atomic Unit +### 18.1 Action — The Atomic Unit ```go -// Register a named action +// Register c.Action("git.log", func(ctx context.Context, opts core.Options) core.Result { dir := opts.String("dir") - return c.Process().RunIn(ctx, dir, "git", "log", "--oneline", "-20") + return c.Process().RunIn(ctx, dir, "git", "log", "--oneline") }) -// Invoke by name +// Invoke r := c.Action("git.log").Run(ctx, core.NewOptions( - core.Option{Key: "dir", Value: "/path/to/repo"}, + core.Option{Key: "dir", Value: "/repo"}, )) -if r.OK { - log := r.Value.(string) -} -// Check if an action exists (permission check) -if c.Action("process.run").Exists() { - // process capability is available -} +// Check capability +c.Action("process.run").Exists() // true if go-process registered + +// List all +c.Actions() // []string{"process.run", "agentic.dispatch", ...} ``` -`c.Action(name)` is dual-purpose like `c.Service(name)`: -- With a handler arg → registers the action -- Without → returns the action for invocation +`c.Action(name)` is dual-purpose: with handler arg → register; without → return for invocation. -### 18.3 Action Signature +### 18.2 Action Type ```go -// ActionHandler is the function signature for all actions. type ActionHandler func(context.Context, Options) Result -// ActionDef is a registered action. -type ActionDef struct { +type Action struct { Name string Handler ActionHandler - Description string // AX: human + agent readable - Schema Options // declares expected input keys (optional) + Description string + Schema Options // expected input keys } ``` -### 18.4 Where Actions Come From +`Action.Run()` has panic recovery and entitlement checking (Section 21) built in. -Services register their actions during `OnStartup`. This is the same pattern as command registration — services own their capabilities: +### 18.3 Where Actions Come From + +Services register during `OnStartup`: ```go -func (s *MyService) OnStartup(ctx context.Context) error { +func (s *MyService) OnStartup(ctx context.Context) core.Result { c := s.Core() - c.Action("process.run", s.handleRun) - c.Action("process.start", s.handleStart) - c.Action("process.kill", s.handleKill) - c.Action("git.clone", s.handleGitClone) - c.Action("git.push", s.handleGitPush) - - return nil + return core.Result{OK: true} } ``` -go-process registers `process.*` actions. core/agent registers `agentic.*` actions. The action namespace IS the capability map. +The action namespace IS the capability map. go-process registers `process.*`, core/agent registers `agentic.*`. -### 18.5 The Permission Model +### 18.4 Permission Model -If `process.run` is not registered, calling it returns `Result{OK: false}`. This is the same "registration IS permission" model from Section 17.7, but generalised to ALL capabilities: +Three states for any action: + +| State | `Exists()` | `Entitled()` | `Run()` | +|-------|-----------|-------------|---------| +| Not registered | false | — | `Result{OK: false}` not registered | +| Registered, not entitled | true | false | `Result{OK: false}` not entitled | +| Registered and entitled | true | true | executes handler | + +### 18.5 Task — Composing Actions ```go -// Full Core — everything available -c := core.New( - core.WithService(process.Register), // registers process.* actions - core.WithService(agentic.Register), // registers agentic.* actions - core.WithService(brain.Register), // registers brain.* actions -) - -// Sandboxed Core — no process, no brain -c := core.New( - core.WithService(agentic.Register), // only agentic.* actions -) -// c.Action("process.run").Run(...) → Result{OK: false} -// c.Action("brain.recall").Run(...) → Result{OK: false} -``` - -### 18.6 core.Task() — Composing Actions - -A Task is a named sequence, chain, or graph of Actions. Think n8n nodes but in code. - -```go -// Sequential chain — stops on first failure -c.Task("deploy", core.TaskDef{ - Description: "Build, test, and deploy to production", - Steps: []core.Step{ - {Action: "go.build", With: core.Options{...}}, - {Action: "go.test", With: core.Options{...}}, - {Action: "docker.push", With: core.Options{...}}, - {Action: "ansible.deploy", With: core.Options{...}}, - }, -}) - -// Run the task -r := c.Task("deploy").Run(ctx, core.NewOptions( - core.Option{Key: "target", Value: "production"}, -)) -``` - -### 18.7 Task Composition Patterns - -```go -// Chain — sequential, output of each feeds next -c.Task("review-pipeline", core.TaskDef{ - Steps: []core.Step{ - {Action: "agentic.dispatch", With: opts}, - {Action: "agentic.verify", Input: "previous"}, // gets output of dispatch - {Action: "agentic.merge", Input: "previous"}, - }, -}) - -// Parallel — all run concurrently, wait for all -c.Task("multi-repo-sweep", core.TaskDef{ - Parallel: []core.Step{ - {Action: "agentic.dispatch", With: optsGoIO}, - {Action: "agentic.dispatch", With: optsGoLog}, - {Action: "agentic.dispatch", With: optsGoMCP}, - }, -}) - -// Conditional — branch on result -c.Task("qa-gate", core.TaskDef{ +c.Task("deploy", core.Task{ + Description: "Build, test, deploy", Steps: []core.Step{ + {Action: "go.build"}, {Action: "go.test"}, - { - If: "previous.OK", - Then: core.Step{Action: "agentic.merge"}, - Else: core.Step{Action: "agentic.flag-review"}, - }, + {Action: "docker.push"}, + {Action: "ansible.deploy", Async: true}, // doesn't block }, }) -// Scheduled — run at a specific time or interval -c.Task("nightly-sweep", core.TaskDef{ - Schedule: "0 2 * * *", // cron: 2am daily - Steps: []core.Step{ - {Action: "agentic.scan"}, - {Action: "agentic.dispatch-fixes", Input: "previous"}, - }, -}) +r := c.Task("deploy").Run(ctx, c, opts) ``` -### 18.8 How This Relates to Existing IPC +Sequential steps stop on first failure. `Async: true` steps fire without blocking. +`Input: "previous"` pipes last step's output to next step. -The current IPC verbs become invocation modes for Actions: - -| Current | Becomes | Purpose | -|---------|---------|---------| -| `c.ACTION(msg)` | `c.Action("name").Broadcast(opts)` | Fire-and-forget to ALL handlers | -| `c.QUERY(q)` | `c.Action("name").Query(opts)` | First responder wins | -| `c.PERFORM(t)` | `c.Action("name").Run(opts)` | Execute and return result | -| `c.PerformAsync(t)` | `c.Action("name").RunAsync(opts)` | Background with progress | - -The anonymous message types (`Message`, `Query`, `Task`) still work for backwards compatibility. Named Actions are the AX-native way forward. - -### 18.9 How Process Fits - -Section 17's `c.Process()` is syntactic sugar over Actions: +### 18.6 Background Execution ```go -// c.Process().Run(ctx, "git", "log") is equivalent to: +r := c.PerformAsync("agentic.dispatch", opts) +taskID := r.Value.(string) + +// Broadcasts ActionTaskStarted, ActionTaskProgress, ActionTaskCompleted +c.Progress(taskID, 0.5, "halfway", "agentic.dispatch") +``` + +### 18.7 How Process Fits + +`c.Process()` is sugar over Actions: + +```go +c.Process().Run(ctx, "git", "log") +// equivalent to: c.Action("process.run").Run(ctx, core.NewOptions( core.Option{Key: "command", Value: "git"}, core.Option{Key: "args", Value: []string{"log"}}, )) - -// c.Process().Start(ctx, opts) is equivalent to: -c.Action("process.start").Run(ctx, core.NewOptions( - core.Option{Key: "command", Value: opts.Command}, - core.Option{Key: "args", Value: opts.Args}, - core.Option{Key: "detach", Value: true}, -)) ``` -The `Process` primitive is a typed convenience layer. Under the hood, it's Actions all the way down. - -### 18.10 Inspecting the Action Registry - -```go -// List all registered actions -actions := c.Actions() // []string{"process.run", "process.start", "agentic.dispatch", ...} - -// Check capabilities -c.Action("process.run").Exists() // true if go-process registered -c.Action("brain.recall").Exists() // true if brain registered - -// Get action metadata -def := c.Action("agentic.dispatch").Def() -// def.Description = "Dispatch a subagent to work on a task" -// def.Schema = Options with expected keys -``` - -This makes the capability map queryable. An agent can inspect what Actions are available before attempting to use them. - --- ## 19. API — Remote Streams -> Status: Design spec. The transport primitive for remote communication. - -### 19.1 The Concept - -HTTP is a stream. WebSocket is a stream. SSE is a stream. MCP over HTTP is a stream. The transport protocol is irrelevant to the consumer — you write bytes, you read bytes. - -``` -c.IPC() → local conclave (in-process, same binary) -c.API() → remote streams (cross-process, cross-machine) -c.Process() → managed execution (via IPC Actions) -``` - -IPC is local. API is remote. The consumer doesn't care which one resolves their Action — if `process.run` is local it goes through IPC, if it's on Charon it goes through API. Same Action name, same Result type. - -### 19.2 The Primitive +Drive is the phone book (WHERE). API is the phone (HOW). Consumer packages register protocol handlers. ```go -// API is the Core primitive for remote communication. -// All remote transports are streams — the protocol is a detail. -type API struct { - core *Core - streams map[string]*Stream +// Configure endpoint in Drive +c.Drive().New(core.NewOptions( + core.Option{Key: "name", Value: "charon"}, + core.Option{Key: "transport", Value: "http://10.69.69.165:9101/mcp"}, +)) + +// Open stream — looks up Drive, finds protocol handler +r := c.API().Stream("charon") +if r.OK { + stream := r.Value.(core.Stream) + stream.Send(payload) + resp, _ := stream.Receive() + stream.Close() } - -// Accessor on Core -func (c *Core) API() *API { return c.api } ``` -### 19.3 Streams - -A Stream is a named, bidirectional connection to a remote endpoint. How it connects (HTTP, WebSocket, TCP, unix socket) is configured in `c.Drive()`. +### 19.1 Stream Interface ```go -// Open a stream to a named endpoint -s, err := c.API().Stream("charon") -// → looks up "charon" in c.Drive() -// → Drive has: transport="http://10.69.69.165:9101/mcp" -// → API opens HTTP connection, returns Stream - -// Stream interface type Stream interface { Send(data []byte) error Receive() ([]byte, error) @@ -895,126 +766,51 @@ type Stream interface { } ``` -### 19.4 Relationship to Drive +### 19.2 Protocol Handlers -`c.Drive()` holds the connection config. `c.API()` opens the actual streams. +Consumer packages register factories per URL scheme: ```go -// Drive holds WHERE to connect -c.Drive().New(core.NewOptions( - core.Option{Key: "name", Value: "charon"}, - core.Option{Key: "transport", Value: "http://10.69.69.165:9101/mcp"}, - core.Option{Key: "token", Value: agentToken}, -)) - -// API handles HOW to connect -s, _ := c.API().Stream("charon") // reads config from Drive("charon") -s.Send(payload) // HTTP POST under the hood -resp, _ := s.Receive() // SSE/response parsing under the hood +// In a transport package's OnStartup: +c.API().RegisterProtocol("http", httpStreamFactory) +c.API().RegisterProtocol("mcp", mcpStreamFactory) ``` -Drive is the phone book. API is the phone. +Resolution: `c.API().Stream("charon")` → Drive lookup → extract scheme → find factory → create Stream. -### 19.5 Protocol Handlers +No protocol handler = no capability. -Different transports register as protocol handlers — same pattern as Actions: +### 19.3 Remote Action Dispatch + +Actions transparently cross machine boundaries via `host:action` syntax: ```go -// In a hypothetical core/http package: -func Register(c *core.Core) core.Result { - c.API().RegisterProtocol("http", httpStreamFactory) - c.API().RegisterProtocol("https", httpStreamFactory) - return core.Result{OK: true} -} +// Local +r := c.RemoteAction("agentic.status", ctx, opts) -// In core/mcp: -func Register(c *core.Core) core.Result { - c.API().RegisterProtocol("mcp", mcpStreamFactory) - return core.Result{OK: true} -} +// Remote — same API, different host +r := c.RemoteAction("charon:agentic.status", ctx, opts) +// → splits on ":" → endpoint="charon", action="agentic.status" +// → c.API().Call("charon", "agentic.status", opts) + +// Web3 — Lethean dVPN routed +r := c.RemoteAction("snider.lthn:brain.recall", ctx, opts) ``` -When `c.API().Stream("charon")` is called: -1. Look up "charon" in Drive → get transport URL -2. Parse protocol from URL → "http" -3. Find registered protocol handler → httpStreamFactory -4. Factory creates the Stream - -No protocol handler = no capability. Same permission model as Process and Actions. - -### 19.6 Remote Action Dispatch - -The killer feature: Actions that transparently cross machine boundaries. +### 19.4 Direct Call ```go -// Local action — goes through IPC -c.Action("agentic.status").Run(ctx, opts) - -// Remote action — goes through API -c.Action("charon:agentic.status").Run(ctx, opts) -// → splits on ":" → host="charon", action="agentic.status" -// → c.API().Stream("charon") → sends JSON-RPC call -// → remote core-agent handles it → result comes back +r := c.API().Call("charon", "agentic.dispatch", opts) +// Opens stream, sends JSON-RPC, receives response, closes stream ``` -The current `dispatchRemote` function in core/agent does exactly this manually — builds MCP JSON-RPC, opens HTTP, parses SSE. With `c.API()`, it becomes one line. - -### 19.7 Where This Already Exists (Partially) - -The pieces are scattered across the ecosystem: - -| Current | Becomes | -|---------|---------| -| `dispatchRemote` in core/agent — manual HTTP + SSE + MCP | `c.Action("charon:agentic.dispatch").Run(opts)` | -| `statusRemote` in core/agent — same manual HTTP | `c.Action("charon:agentic.status").Run(opts)` | -| `mcpInitialize` / `mcpCall` in core/agent — MCP handshake | `c.API().Stream("charon")` (MCP protocol handler) | -| `brainRecall` in core/agent — HTTP POST to brain API | `c.Action("brain.recall").Run(opts)` or `c.API().Stream("brain")` | -| Forge API calls — custom HTTP client | `c.API().Stream("forge")` | -| `DriveHandle.Transport` — stores URLs | `c.Drive()` already does this — API reads from it | - -### 19.8 The Full Subsystem Map - -``` -c.Registry() — universal named collection (the brick all registries use) -c.Options() — input configuration (what was passed to New) -c.App() — identity (name, version) -c.Config() — runtime settings -c.Data() — embedded assets -c.Drive() — connection config (WHERE to reach things) -c.API() — remote streams (HOW to reach things) -c.Fs() — filesystem -c.Process() — managed execution -c.Action() — named callables (register, invoke, inspect) -c.IPC() — local message bus (consumes Action registry) -c.Cli() — command tree -c.Log() — logging -c.Error() — panic recovery -c.I18n() — internationalisation -``` - -14 subsystems. `c.Registry()` is the foundation — most other subsystems build on it. - --- ## 20. Registry — The Universal Collection Primitive -> Status: Design spec. Extracts the pattern shared by 5+ existing registries. +Thread-safe named collection. The brick all registries build on. -### 20.1 The Problem - -Core has multiple independent registry implementations that all do the same thing: - -``` -serviceRegistry — map[string]*Service + mutex + locked -commandRegistry — map[string]*Command + mutex -Ipc handlers — []func + mutex -Drive — map[string]*DriveHandle + mutex -Data — map[string]*Embed -``` - -Five registries, five implementations of: named map + thread safety + optional locking. - -### 20.2 The Primitive +### 20.1 The Type ```go // Registry is a thread-safe named collection. The universal brick @@ -1082,53 +878,17 @@ c.Registry("actions").Get("process.run") // universal access The typed accessors stay — they're ergonomic and type-safe. `c.Registry()` adds the universal query layer on top. -### 20.6 Why This Matters for IPC +### 20.6 What Embeds Registry -This resolves Issue 6 (serviceRegistry unexported) and Issue 12 (Ipc data-only struct) cleanly: +All named collections in Core embed `Registry[T]`: -**IPC is safe to expose Actions and Handlers** because it doesn't control the write path. The Registry does: - -```go -// IPC reads from the registry (safe, read-only) -c.IPC().Actions() // reads c.Registry("actions").Names() -c.IPC().Handlers() // reads c.Registry("handlers").Len() - -// Registration goes through the primitive (controlled) -c.Action("process.run", handler) // writes to c.Registry("actions") - -// Locking goes through the primitive -c.Registry("actions").Lock() // no more registration after startup -``` - -IPC is a consumer of the registry, not the owner of the data. The Registry primitive owns the data. This is the separation that makes it safe to export everything. - -### 20.7 ServiceRegistry and CommandRegistry Become Exported - -With `Registry[T]` as the brick: - -```go -type ServiceRegistry struct { - *Registry[*Service] -} - -type CommandRegistry struct { - *Registry[*Command] -} -``` - -These are now exported types — consumers can extend service management and command routing. But they can't bypass the lock because `Registry.Set()` checks `locked`. The primitive enforces the contract. - -### 20.8 What This Replaces - -| Current | Becomes | -|---------|---------| -| `serviceRegistry` (unexported, custom map+mutex) | `ServiceRegistry` embedding `Registry[*Service]` | -| `commandRegistry` (unexported, custom map+mutex) | `CommandRegistry` embedding `Registry[*Command]` | -| `Drive.handles` (internal map+mutex) | `Drive` embedding `Registry[*DriveHandle]` | -| `Data.mounts` (internal map) | `Data` embedding `Registry[*Embed]` | -| `Ipc.ipcHandlers` (internal slice+mutex) | `Registry[ActionHandler]` in IPC | -| 5 separate lock implementations | One `Registry.Lock()` / `Registry.Locked()` | -| `WithServiceLock()` | `c.Registry("services").Lock()` | +- `ServiceRegistry` → `Registry[*Service]` +- `CommandRegistry` → `Registry[*Command]` +- `Drive` → `Registry[*DriveHandle]` +- `Data` → `Registry[*Embed]` +- `Lock.locks` → `Registry[*sync.RWMutex]` +- `IPC.actions` → `Registry[*Action]` +- `IPC.tasks` → `Registry[*Task]` ---