fix: remove type Task any — untyped IPC replaced by named Actions

The old IPC Task system passed `any` through TaskHandler and
PerformAsync. Now that named Actions exist with typed signatures
(ActionHandler func(context.Context, Options) Result), the untyped
layer is dead weight.

Changes:
- type Task any removed (was in contract.go)
- type Task struct is now the composed sequence (action.go)
- PerformAsync takes (action string, opts Options) not (t Task)
- TaskHandler type removed — use c.Action("name", handler)
- RegisterTask removed — use c.Action("name", handler)
- PERFORM sugar removed — use c.Action("name").Run()
- ActionTaskStarted/Progress/Completed carry typed fields
  (Action string, Options, Result) not any

ActionDef → Action rename also in this commit (same principle:
DTOs don't have Run() methods).

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Snider 2026-03-25 15:57:36 +00:00
parent c5c16a7a21
commit 028ec84c5e
9 changed files with 96 additions and 133 deletions

View file

@ -122,9 +122,9 @@ type Step struct {
Input string // "previous" = output of last step piped as input Input string // "previous" = output of last step piped as input
} }
// TaskDef is a named sequence of Steps. // Task is a named sequence of Steps.
// //
// c.Task("agent.completion", core.TaskDef{ // c.Task("agent.completion", core.Task{
// Steps: []core.Step{ // Steps: []core.Step{
// {Action: "agentic.qa"}, // {Action: "agentic.qa"},
// {Action: "agentic.auto-pr"}, // {Action: "agentic.auto-pr"},
@ -132,7 +132,7 @@ type Step struct {
// {Action: "agentic.poke", Async: true}, // {Action: "agentic.poke", Async: true},
// }, // },
// }) // })
type TaskDef struct { type Task struct {
Name string Name string
Description string Description string
Steps []Step Steps []Step
@ -143,7 +143,7 @@ type TaskDef struct {
// The "previous" input pipes the last sync step's output to the next step. // The "previous" input pipes the last sync step's output to the next step.
// //
// r := c.Task("deploy").Run(ctx, opts) // r := c.Task("deploy").Run(ctx, opts)
func (t *TaskDef) Run(ctx context.Context, c *Core, opts Options) Result { func (t *Task) Run(ctx context.Context, c *Core, opts Options) Result {
if t == nil || len(t.Steps) == 0 { if t == nil || len(t.Steps) == 0 {
return Result{E("task.Run", Concat("task has no steps: ", t.safeName()), nil), false} return Result{E("task.Run", Concat("task has no steps: ", t.safeName()), nil), false}
} }
@ -187,7 +187,7 @@ func (t *TaskDef) Run(ctx context.Context, c *Core, opts Options) Result {
return lastResult return lastResult
} }
func (t *TaskDef) safeName() string { func (t *Task) safeName() string {
if t == nil { if t == nil {
return "<nil>" return "<nil>"
} }
@ -201,12 +201,12 @@ func stepOptions(step Step) Options {
} }
// Task gets or registers a named task. // Task gets or registers a named task.
// With a TaskDef argument: registers the task. // With a Task argument: registers the task.
// Without: returns the task for invocation. // Without: returns the task for invocation.
// //
// c.Task("deploy", core.TaskDef{Steps: steps}) // register // c.Task("deploy", core.Task{Steps: steps}) // register
// c.Task("deploy").Run(ctx, c, opts) // invoke // c.Task("deploy").Run(ctx, c, opts) // invoke
func (c *Core) Task(name string, def ...TaskDef) *TaskDef { func (c *Core) Task(name string, def ...Task) *Task {
if len(def) > 0 { if len(def) > 0 {
d := def[0] d := def[0]
d.Name = name d.Name = name
@ -215,9 +215,9 @@ func (c *Core) Task(name string, def ...TaskDef) *TaskDef {
} }
r := c.ipc.tasks.Get(name) r := c.ipc.tasks.Get(name)
if !r.OK { if !r.OK {
return &TaskDef{Name: name} return &Task{Name: name}
} }
return r.Value.(*TaskDef) return r.Value.(*Task)
} }
// Tasks returns all registered task names. // Tasks returns all registered task names.

View file

@ -153,7 +153,7 @@ func TestAction_Task_Good_Sequential(t *testing.T) {
return Result{Value: "output-b", OK: true} return Result{Value: "output-b", OK: true}
}) })
c.Task("pipeline", TaskDef{ c.Task("pipeline", Task{
Steps: []Step{ Steps: []Step{
{Action: "step.a"}, {Action: "step.a"},
{Action: "step.b"}, {Action: "step.b"},
@ -182,7 +182,7 @@ func TestAction_Task_Bad_StepFails(t *testing.T) {
return Result{OK: true} return Result{OK: true}
}) })
c.Task("broken", TaskDef{ c.Task("broken", Task{
Steps: []Step{ Steps: []Step{
{Action: "step.ok"}, {Action: "step.ok"},
{Action: "step.fail"}, {Action: "step.fail"},
@ -197,7 +197,7 @@ func TestAction_Task_Bad_StepFails(t *testing.T) {
func TestAction_Task_Bad_MissingAction(t *testing.T) { func TestAction_Task_Bad_MissingAction(t *testing.T) {
c := New() c := New()
c.Task("missing", TaskDef{ c.Task("missing", Task{
Steps: []Step{ Steps: []Step{
{Action: "nonexistent"}, {Action: "nonexistent"},
}, },
@ -219,7 +219,7 @@ func TestAction_Task_Good_PreviousInput(t *testing.T) {
return Result{Value: "got: " + input.Value.(string), OK: true} return Result{Value: "got: " + input.Value.(string), OK: true}
}) })
c.Task("pipe", TaskDef{ c.Task("pipe", Task{
Steps: []Step{ Steps: []Step{
{Action: "produce"}, {Action: "produce"},
{Action: "consume", Input: "previous"}, {Action: "consume", Input: "previous"},
@ -233,14 +233,14 @@ func TestAction_Task_Good_PreviousInput(t *testing.T) {
func TestAction_Task_Ugly_EmptySteps(t *testing.T) { func TestAction_Task_Ugly_EmptySteps(t *testing.T) {
c := New() c := New()
c.Task("empty", TaskDef{}) c.Task("empty", Task{})
r := c.Task("empty").Run(context.Background(), c, NewOptions()) r := c.Task("empty").Run(context.Background(), c, NewOptions())
assert.False(t, r.OK) assert.False(t, r.OK)
} }
func TestAction_Tasks_Good(t *testing.T) { func TestAction_Tasks_Good(t *testing.T) {
c := New() c := New()
c.Task("deploy", TaskDef{Steps: []Step{{Action: "x"}}}) c.Task("deploy", Task{Steps: []Step{{Action: "x"}}})
c.Task("review", TaskDef{Steps: []Step{{Action: "y"}}}) c.Task("review", Task{Steps: []Step{{Action: "y"}}})
assert.Equal(t, []string{"deploy", "review"}, c.Tasks()) assert.Equal(t, []string{"deploy", "review"}, c.Tasks())
} }

View file

@ -16,22 +16,9 @@ type Message any
// Query is the type for read-only IPC requests. // Query is the type for read-only IPC requests.
type Query any type Query any
// Task is the type for IPC requests that perform side effects.
type Task any
// TaskWithIdentifier is an optional interface for tasks that need to know their assigned identifier.
type TaskWithIdentifier interface {
Task
SetTaskIdentifier(id string)
GetTaskIdentifier() string
}
// QueryHandler handles Query requests. Returns Result{Value, OK}. // QueryHandler handles Query requests. Returns Result{Value, OK}.
type QueryHandler func(*Core, Query) Result type QueryHandler func(*Core, Query) Result
// TaskHandler handles Task requests. Returns Result{Value, OK}.
type TaskHandler func(*Core, Task) Result
// Startable is implemented by services that need startup initialisation. // Startable is implemented by services that need startup initialisation.
// //
// func (s *MyService) OnStartup(ctx context.Context) core.Result { // func (s *MyService) OnStartup(ctx context.Context) core.Result {
@ -57,21 +44,21 @@ type ActionServiceShutdown struct{}
type ActionTaskStarted struct { type ActionTaskStarted struct {
TaskIdentifier string TaskIdentifier string
Task Task Action string
Options Options
} }
type ActionTaskProgress struct { type ActionTaskProgress struct {
TaskIdentifier string TaskIdentifier string
Task Task Action string
Progress float64 Progress float64
Message string Message string
} }
type ActionTaskCompleted struct { type ActionTaskCompleted struct {
TaskIdentifier string TaskIdentifier string
Task Task Action string
Result any Result Result
Error error
} }
// --- Constructor --- // --- Constructor ---
@ -106,7 +93,7 @@ func New(opts ...CoreOption) *Core {
error: &ErrorPanic{}, error: &ErrorPanic{},
log: &ErrorLog{}, log: &ErrorLog{},
lock: &Lock{locks: NewRegistry[*sync.RWMutex]()}, lock: &Lock{locks: NewRegistry[*sync.RWMutex]()},
ipc: &Ipc{actions: NewRegistry[*Action](), tasks: NewRegistry[*TaskDef]()}, ipc: &Ipc{actions: NewRegistry[*Action](), tasks: NewRegistry[*Task]()},
info: systemInfo, info: systemInfo,
i18n: &I18n{}, i18n: &I18n{},
services: &ServiceRegistry{Registry: NewRegistry[*Service]()}, services: &ServiceRegistry{Registry: NewRegistry[*Service]()},

View file

@ -108,7 +108,6 @@ func (c *Core) Run() {
func (c *Core) ACTION(msg Message) Result { return c.broadcast(msg) } func (c *Core) ACTION(msg Message) Result { return c.broadcast(msg) }
func (c *Core) QUERY(q Query) Result { return c.Query(q) } func (c *Core) QUERY(q Query) Result { return c.Query(q) }
func (c *Core) QUERYALL(q Query) Result { return c.QueryAll(q) } func (c *Core) QUERYALL(q Query) Result { return c.QueryAll(q) }
func (c *Core) PERFORM(t Task) Result { return c.Perform(t) }
// --- Error+Log --- // --- Error+Log ---

View file

@ -2540,6 +2540,8 @@ c.Task("agent-completion-pipeline", core.TaskDef{
The Task executor runs steps in order, with `Async: true` steps dispatched in parallel. Ingest and Poke don't wait for the pipeline — they fire immediately. The pipeline has a timeout. Each step has its own error handling. The Task executor runs steps in order, with `Async: true` steps dispatched in parallel. Ingest and Poke don't wait for the pipeline — they fire immediately. The pipeline has a timeout. Each step has its own error handling.
**Status (2026-03-25):** core/go `TaskDef` with `Steps` is implemented and tested. The core/agent wiring is documented in `core/agent/docs/plans/2026-03-25-core-go-v0.8.0-migration.md` Priority 5.
### P6-2. Every Handler Receives Every Message — O(handlers × messages) ### P6-2. Every Handler Receives Every Message — O(handlers × messages)
All 5 handlers are called for every ACTION. Each handler type-checks and skips if it's not their message. With N handlers and M message types, this is O(N×M) per event — every handler processes every message even if it only cares about one type. All 5 handlers are called for every ACTION. Each handler type-checks and skips if it's not their message. With N handlers and M message types, this is O(N×M) per event — every handler processes every message even if it only cares about one type.
@ -3693,7 +3695,7 @@ The meta-assumption: this RFC is complete. It's not. It's the best single-sessio
**This is by design for v0.8.0.** All services are first-party trusted code. The Lego Bricks philosophy says "export everything." The tension is: Lego Bricks vs Least Privilege. **This is by design for v0.8.0.** All services are first-party trusted code. The Lego Bricks philosophy says "export everything." The tension is: Lego Bricks vs Least Privilege.
**Resolution for v0.9.0+:** Entitlements, not CoreView. The boundary system already exists in CorePHP (RFC-004: Entitlements). Port it: **Resolution:** Section 21 (Entitlement primitive) — designed, implementation pending. Brought forward from v0.9.0 to v0.8.0. Port RFC-004 concept:
``` ```
Registration = capability ("process.run action exists") Registration = capability ("process.run action exists")
@ -3715,7 +3717,7 @@ IPC dispatch is synchronous. Startup is synchronous. File I/O assumes no concurr
**The cascade (P6-1) is the symptom.** The root cause is that Core was designed for sequential execution and concurrency was added incrementally without revisiting the foundations. **The cascade (P6-1) is the symptom.** The root cause is that Core was designed for sequential execution and concurrency was added incrementally without revisiting the foundations.
**Resolution:** The Action/Task system (Section 18) is the fix. Actions execute with concurrency control. Tasks define parallel/sequential composition. The IPC bus stops being the execution engine — it becomes the notification channel. PERFORM replaces ACTION for request/response. Async is opt-in per Action, not per handler. **Resolution:** The Action/Task system (Section 18) is implemented in core/go. `TaskDef` with `Steps` supports sequential chains, async dispatch, and previous-input piping. The cascade fix requires core/agent to wire its handlers as named Actions and replace the nested `c.ACTION()` calls with `c.Task("agent.completion").Run()`. See `core/agent/docs/plans/2026-03-25-core-go-v0.8.0-migration.md` Priority 5.
### Root Cause 4: No Recovery Path — 10 findings ### Root Cause 4: No Recovery Path — 10 findings

11
ipc.go
View file

@ -21,11 +21,8 @@ type Ipc struct {
queryMu sync.RWMutex queryMu sync.RWMutex
queryHandlers []QueryHandler queryHandlers []QueryHandler
taskMu sync.RWMutex
taskHandlers []TaskHandler
actions *Registry[*Action] // named action registry actions *Registry[*Action] // named action registry
tasks *Registry[*TaskDef] // named task registry tasks *Registry[*Task] // named task registry
} }
// broadcast dispatches a message to all registered IPC handlers. // broadcast dispatches a message to all registered IPC handlers.
@ -104,9 +101,3 @@ func (c *Core) RegisterActions(handlers ...func(*Core, Message) Result) {
c.ipc.ipcMu.Unlock() c.ipc.ipcMu.Unlock()
} }
// RegisterTask registers a handler for PERFORM task dispatch.
func (c *Core) RegisterTask(handler TaskHandler) {
c.ipc.taskMu.Lock()
c.ipc.taskHandlers = append(c.ipc.taskHandlers, handler)
c.ipc.taskMu.Unlock()
}

View file

@ -1,6 +1,7 @@
package core_test package core_test
import ( import (
"context"
"testing" "testing"
. "dappco.re/go/core" . "dappco.re/go/core"
@ -128,17 +129,14 @@ func TestIpc_QueryAll_Good(t *testing.T) {
assert.Contains(t, results, "b") assert.Contains(t, results, "b")
} }
// --- IPC: Tasks --- // --- IPC: Named Action Invocation ---
func TestIpc_Perform_Good(t *testing.T) { func TestIpc_ActionInvoke_Good(t *testing.T) {
c := New() c := New()
c.RegisterTask(func(_ *Core, t Task) Result { c.Action("compute", func(_ context.Context, opts Options) Result {
if t == "compute" { return Result{Value: 42, OK: true}
return Result{Value: 42, OK: true}
}
return Result{}
}) })
r := c.PERFORM("compute") r := c.Action("compute").Run(context.Background(), NewOptions())
assert.True(t, r.OK) assert.True(t, r.OK)
assert.Equal(t, 42, r.Value) assert.Equal(t, 42, r.Value)
} }

94
task.go
View file

@ -1,77 +1,61 @@
// SPDX-License-Identifier: EUPL-1.2 // SPDX-License-Identifier: EUPL-1.2
// Background task dispatch for the Core framework. // Background action dispatch for the Core framework.
// PerformAsync runs a named Action in a background goroutine with
// panic recovery and progress broadcasting.
package core package core
import ( import "context"
"reflect"
"slices"
"strconv"
)
// TaskState holds background task state. // PerformAsync dispatches a named action in a background goroutine.
type TaskState struct { // Broadcasts ActionTaskStarted, ActionTaskProgress, and ActionTaskCompleted
Identifier string // as IPC messages so other services can track progress.
Task Task //
Result any // r := c.PerformAsync("agentic.dispatch", opts)
Error error // taskID := r.Value.(string)
} func (c *Core) PerformAsync(action string, opts Options) Result {
// PerformAsync dispatches a task in a background goroutine.
func (c *Core) PerformAsync(t Task) Result {
if c.shutdown.Load() { if c.shutdown.Load() {
return Result{} return Result{}
} }
taskID := Concat("task-", strconv.FormatUint(c.taskIDCounter.Add(1), 10)) taskID := ID()
if tid, ok := t.(TaskWithIdentifier); ok {
tid.SetTaskIdentifier(taskID) c.ACTION(ActionTaskStarted{TaskIdentifier: taskID, Action: action, Options: opts})
}
c.ACTION(ActionTaskStarted{TaskIdentifier: taskID, Task: t})
c.waitGroup.Go(func() { c.waitGroup.Go(func() {
defer func() { defer func() {
if rec := recover(); rec != nil { if rec := recover(); rec != nil {
err := E("core.PerformAsync", Sprint("panic: ", rec), nil) c.ACTION(ActionTaskCompleted{
c.ACTION(ActionTaskCompleted{TaskIdentifier: taskID, Task: t, Result: nil, Error: err}) TaskIdentifier: taskID,
Action: action,
Result: Result{E("core.PerformAsync", Sprint("panic: ", rec), nil), false},
})
} }
}() }()
r := c.PERFORM(t)
var err error r := c.Action(action).Run(context.Background(), opts)
if !r.OK {
if e, ok := r.Value.(error); ok { c.ACTION(ActionTaskCompleted{
err = e TaskIdentifier: taskID,
} else { Action: action,
taskType := reflect.TypeOf(t) Result: r,
typeName := "<nil>" })
if taskType != nil {
typeName = taskType.String()
}
err = E("core.PerformAsync", Join(" ", "no handler found for task type", typeName), nil)
}
}
c.ACTION(ActionTaskCompleted{TaskIdentifier: taskID, Task: t, Result: r.Value, Error: err})
}) })
return Result{taskID, true} return Result{taskID, true}
} }
// Progress broadcasts a progress update for a background task. // Progress broadcasts a progress update for a background task.
func (c *Core) Progress(taskID string, progress float64, message string, t Task) { //
c.ACTION(ActionTaskProgress{TaskIdentifier: taskID, Task: t, Progress: progress, Message: message}) // c.Progress(taskID, 0.5, "halfway done", "agentic.dispatch")
func (c *Core) Progress(taskID string, progress float64, message string, action string) {
c.ACTION(ActionTaskProgress{
TaskIdentifier: taskID,
Action: action,
Progress: progress,
Message: message,
})
} }
func (c *Core) Perform(t Task) Result { // Registration methods (RegisterAction, RegisterActions)
c.ipc.taskMu.RLock()
handlers := slices.Clone(c.ipc.taskHandlers)
c.ipc.taskMu.RUnlock()
for _, h := range handlers {
r := h(c, t)
if r.OK {
return r
}
}
return Result{}
}
// Registration methods (RegisterAction, RegisterActions, RegisterTask)
// are in ipc.go — registration is IPC's responsibility. // are in ipc.go — registration is IPC's responsibility.

View file

@ -17,17 +17,16 @@ func TestTask_PerformAsync_Good(t *testing.T) {
var mu sync.Mutex var mu sync.Mutex
var result string var result string
c.RegisterTask(func(_ *Core, task Task) Result { c.Action("work", func(_ context.Context, _ Options) Result {
mu.Lock() mu.Lock()
result = "done" result = "done"
mu.Unlock() mu.Unlock()
return Result{"completed", true} return Result{Value: "done", OK: true}
}) })
r := c.PerformAsync("work") r := c.PerformAsync("work", NewOptions())
assert.True(t, r.OK) assert.True(t, r.OK)
taskID := r.Value.(string) assert.True(t, HasPrefix(r.Value.(string), "id-"), "should return task ID")
assert.NotEmpty(t, taskID)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -36,24 +35,25 @@ func TestTask_PerformAsync_Good(t *testing.T) {
mu.Unlock() mu.Unlock()
} }
func TestTask_PerformAsync_Progress_Good(t *testing.T) { func TestTask_PerformAsync_Good_Progress(t *testing.T) {
c := New() c := New()
c.RegisterTask(func(_ *Core, task Task) Result { c.Action("tracked", func(_ context.Context, _ Options) Result {
return Result{OK: true} return Result{OK: true}
}) })
r := c.PerformAsync("work") r := c.PerformAsync("tracked", NewOptions())
taskID := r.Value.(string) taskID := r.Value.(string)
c.Progress(taskID, 0.5, "halfway", "work") c.Progress(taskID, 0.5, "halfway", "tracked")
} }
func TestTask_PerformAsync_Completion_Good(t *testing.T) { func TestTask_PerformAsync_Good_Completion(t *testing.T) {
c := New() c := New()
completed := make(chan ActionTaskCompleted, 1) completed := make(chan ActionTaskCompleted, 1)
c.RegisterTask(func(_ *Core, task Task) Result { c.Action("completable", func(_ context.Context, _ Options) Result {
return Result{Value: "result", OK: true} return Result{Value: "output", OK: true}
}) })
c.RegisterAction(func(_ *Core, msg Message) Result { c.RegisterAction(func(_ *Core, msg Message) Result {
if evt, ok := msg.(ActionTaskCompleted); ok { if evt, ok := msg.(ActionTaskCompleted); ok {
completed <- evt completed <- evt
@ -61,18 +61,18 @@ func TestTask_PerformAsync_Completion_Good(t *testing.T) {
return Result{OK: true} return Result{OK: true}
}) })
c.PerformAsync("work") c.PerformAsync("completable", NewOptions())
select { select {
case evt := <-completed: case evt := <-completed:
assert.Nil(t, evt.Error) assert.True(t, evt.Result.OK)
assert.Equal(t, "result", evt.Result) assert.Equal(t, "output", evt.Result.Value)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for completion") t.Fatal("timed out waiting for completion")
} }
} }
func TestTask_PerformAsync_NoHandler_Good(t *testing.T) { func TestTask_PerformAsync_Bad_ActionNotRegistered(t *testing.T) {
c := New() c := New()
completed := make(chan ActionTaskCompleted, 1) completed := make(chan ActionTaskCompleted, 1)
@ -83,26 +83,28 @@ func TestTask_PerformAsync_NoHandler_Good(t *testing.T) {
return Result{OK: true} return Result{OK: true}
}) })
c.PerformAsync("unhandled") c.PerformAsync("nonexistent", NewOptions())
select { select {
case evt := <-completed: case evt := <-completed:
assert.NotNil(t, evt.Error) assert.False(t, evt.Result.OK, "unregistered action should fail")
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatal("timed out") t.Fatal("timed out")
} }
} }
func TestTask_PerformAsync_AfterShutdown_Bad(t *testing.T) { func TestTask_PerformAsync_Bad_AfterShutdown(t *testing.T) {
c := New() c := New()
c.Action("work", func(_ context.Context, _ Options) Result { return Result{OK: true} })
c.ServiceStartup(context.Background(), nil) c.ServiceStartup(context.Background(), nil)
c.ServiceShutdown(context.Background()) c.ServiceShutdown(context.Background())
r := c.PerformAsync("should not run") r := c.PerformAsync("work", NewOptions())
assert.False(t, r.OK) assert.False(t, r.OK)
} }
// --- RegisterAction + RegisterActions --- // --- RegisterAction + RegisterActions (broadcast handlers) ---
func TestTask_RegisterAction_Good(t *testing.T) { func TestTask_RegisterAction_Good(t *testing.T) {
c := New() c := New()