From 028ec84c5e69ab893d1cfa6ebc52546c9257eba5 Mon Sep 17 00:00:00 2001 From: Snider Date: Wed, 25 Mar 2026 15:57:36 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20remove=20type=20Task=20any=20=E2=80=94?= =?UTF-8?q?=20untyped=20IPC=20replaced=20by=20named=20Actions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The old IPC Task system passed `any` through TaskHandler and PerformAsync. Now that named Actions exist with typed signatures (ActionHandler func(context.Context, Options) Result), the untyped layer is dead weight. Changes: - type Task any removed (was in contract.go) - type Task struct is now the composed sequence (action.go) - PerformAsync takes (action string, opts Options) not (t Task) - TaskHandler type removed — use c.Action("name", handler) - RegisterTask removed — use c.Action("name", handler) - PERFORM sugar removed — use c.Action("name").Run() - ActionTaskStarted/Progress/Completed carry typed fields (Action string, Options, Result) not any ActionDef → Action rename also in this commit (same principle: DTOs don't have Run() methods). Co-Authored-By: Virgil --- action.go | 20 +++++------ action_test.go | 14 ++++---- contract.go | 25 ++++---------- core.go | 1 - docs/RFC.md | 6 ++-- ipc.go | 11 +----- ipc_test.go | 14 ++++---- task.go | 94 +++++++++++++++++++++----------------------------- task_test.go | 44 ++++++++++++----------- 9 files changed, 96 insertions(+), 133 deletions(-) diff --git a/action.go b/action.go index 03f4348..357c87c 100644 --- a/action.go +++ b/action.go @@ -122,9 +122,9 @@ type Step struct { Input string // "previous" = output of last step piped as input } -// TaskDef is a named sequence of Steps. +// Task is a named sequence of Steps. // -// c.Task("agent.completion", core.TaskDef{ +// c.Task("agent.completion", core.Task{ // Steps: []core.Step{ // {Action: "agentic.qa"}, // {Action: "agentic.auto-pr"}, @@ -132,7 +132,7 @@ type Step struct { // {Action: "agentic.poke", Async: true}, // }, // }) -type TaskDef struct { +type Task struct { Name string Description string Steps []Step @@ -143,7 +143,7 @@ type TaskDef struct { // The "previous" input pipes the last sync step's output to the next step. // // r := c.Task("deploy").Run(ctx, opts) -func (t *TaskDef) Run(ctx context.Context, c *Core, opts Options) Result { +func (t *Task) Run(ctx context.Context, c *Core, opts Options) Result { if t == nil || len(t.Steps) == 0 { return Result{E("task.Run", Concat("task has no steps: ", t.safeName()), nil), false} } @@ -187,7 +187,7 @@ func (t *TaskDef) Run(ctx context.Context, c *Core, opts Options) Result { return lastResult } -func (t *TaskDef) safeName() string { +func (t *Task) safeName() string { if t == nil { return "" } @@ -201,12 +201,12 @@ func stepOptions(step Step) Options { } // Task gets or registers a named task. -// With a TaskDef argument: registers the task. +// With a Task argument: registers the task. // Without: returns the task for invocation. // -// c.Task("deploy", core.TaskDef{Steps: steps}) // register +// c.Task("deploy", core.Task{Steps: steps}) // register // c.Task("deploy").Run(ctx, c, opts) // invoke -func (c *Core) Task(name string, def ...TaskDef) *TaskDef { +func (c *Core) Task(name string, def ...Task) *Task { if len(def) > 0 { d := def[0] d.Name = name @@ -215,9 +215,9 @@ func (c *Core) Task(name string, def ...TaskDef) *TaskDef { } r := c.ipc.tasks.Get(name) if !r.OK { - return &TaskDef{Name: name} + return &Task{Name: name} } - return r.Value.(*TaskDef) + return r.Value.(*Task) } // Tasks returns all registered task names. diff --git a/action_test.go b/action_test.go index 988ac3f..077a57a 100644 --- a/action_test.go +++ b/action_test.go @@ -153,7 +153,7 @@ func TestAction_Task_Good_Sequential(t *testing.T) { return Result{Value: "output-b", OK: true} }) - c.Task("pipeline", TaskDef{ + c.Task("pipeline", Task{ Steps: []Step{ {Action: "step.a"}, {Action: "step.b"}, @@ -182,7 +182,7 @@ func TestAction_Task_Bad_StepFails(t *testing.T) { return Result{OK: true} }) - c.Task("broken", TaskDef{ + c.Task("broken", Task{ Steps: []Step{ {Action: "step.ok"}, {Action: "step.fail"}, @@ -197,7 +197,7 @@ func TestAction_Task_Bad_StepFails(t *testing.T) { func TestAction_Task_Bad_MissingAction(t *testing.T) { c := New() - c.Task("missing", TaskDef{ + c.Task("missing", Task{ Steps: []Step{ {Action: "nonexistent"}, }, @@ -219,7 +219,7 @@ func TestAction_Task_Good_PreviousInput(t *testing.T) { return Result{Value: "got: " + input.Value.(string), OK: true} }) - c.Task("pipe", TaskDef{ + c.Task("pipe", Task{ Steps: []Step{ {Action: "produce"}, {Action: "consume", Input: "previous"}, @@ -233,14 +233,14 @@ func TestAction_Task_Good_PreviousInput(t *testing.T) { func TestAction_Task_Ugly_EmptySteps(t *testing.T) { c := New() - c.Task("empty", TaskDef{}) + c.Task("empty", Task{}) r := c.Task("empty").Run(context.Background(), c, NewOptions()) assert.False(t, r.OK) } func TestAction_Tasks_Good(t *testing.T) { c := New() - c.Task("deploy", TaskDef{Steps: []Step{{Action: "x"}}}) - c.Task("review", TaskDef{Steps: []Step{{Action: "y"}}}) + c.Task("deploy", Task{Steps: []Step{{Action: "x"}}}) + c.Task("review", Task{Steps: []Step{{Action: "y"}}}) assert.Equal(t, []string{"deploy", "review"}, c.Tasks()) } diff --git a/contract.go b/contract.go index db3a41f..793ecd6 100644 --- a/contract.go +++ b/contract.go @@ -16,22 +16,9 @@ type Message any // Query is the type for read-only IPC requests. type Query any -// Task is the type for IPC requests that perform side effects. -type Task any - -// TaskWithIdentifier is an optional interface for tasks that need to know their assigned identifier. -type TaskWithIdentifier interface { - Task - SetTaskIdentifier(id string) - GetTaskIdentifier() string -} - // QueryHandler handles Query requests. Returns Result{Value, OK}. type QueryHandler func(*Core, Query) Result -// TaskHandler handles Task requests. Returns Result{Value, OK}. -type TaskHandler func(*Core, Task) Result - // Startable is implemented by services that need startup initialisation. // // func (s *MyService) OnStartup(ctx context.Context) core.Result { @@ -57,21 +44,21 @@ type ActionServiceShutdown struct{} type ActionTaskStarted struct { TaskIdentifier string - Task Task + Action string + Options Options } type ActionTaskProgress struct { TaskIdentifier string - Task Task + Action string Progress float64 Message string } type ActionTaskCompleted struct { TaskIdentifier string - Task Task - Result any - Error error + Action string + Result Result } // --- Constructor --- @@ -106,7 +93,7 @@ func New(opts ...CoreOption) *Core { error: &ErrorPanic{}, log: &ErrorLog{}, lock: &Lock{locks: NewRegistry[*sync.RWMutex]()}, - ipc: &Ipc{actions: NewRegistry[*Action](), tasks: NewRegistry[*TaskDef]()}, + ipc: &Ipc{actions: NewRegistry[*Action](), tasks: NewRegistry[*Task]()}, info: systemInfo, i18n: &I18n{}, services: &ServiceRegistry{Registry: NewRegistry[*Service]()}, diff --git a/core.go b/core.go index 8a454d6..94f0ee0 100644 --- a/core.go +++ b/core.go @@ -108,7 +108,6 @@ func (c *Core) Run() { func (c *Core) ACTION(msg Message) Result { return c.broadcast(msg) } func (c *Core) QUERY(q Query) Result { return c.Query(q) } func (c *Core) QUERYALL(q Query) Result { return c.QueryAll(q) } -func (c *Core) PERFORM(t Task) Result { return c.Perform(t) } // --- Error+Log --- diff --git a/docs/RFC.md b/docs/RFC.md index 030f376..0aa621c 100644 --- a/docs/RFC.md +++ b/docs/RFC.md @@ -2540,6 +2540,8 @@ c.Task("agent-completion-pipeline", core.TaskDef{ The Task executor runs steps in order, with `Async: true` steps dispatched in parallel. Ingest and Poke don't wait for the pipeline — they fire immediately. The pipeline has a timeout. Each step has its own error handling. +**Status (2026-03-25):** core/go `TaskDef` with `Steps` is implemented and tested. The core/agent wiring is documented in `core/agent/docs/plans/2026-03-25-core-go-v0.8.0-migration.md` Priority 5. + ### P6-2. Every Handler Receives Every Message — O(handlers × messages) All 5 handlers are called for every ACTION. Each handler type-checks and skips if it's not their message. With N handlers and M message types, this is O(N×M) per event — every handler processes every message even if it only cares about one type. @@ -3693,7 +3695,7 @@ The meta-assumption: this RFC is complete. It's not. It's the best single-sessio **This is by design for v0.8.0.** All services are first-party trusted code. The Lego Bricks philosophy says "export everything." The tension is: Lego Bricks vs Least Privilege. -**Resolution for v0.9.0+:** Entitlements, not CoreView. The boundary system already exists in CorePHP (RFC-004: Entitlements). Port it: +**Resolution:** Section 21 (Entitlement primitive) — designed, implementation pending. Brought forward from v0.9.0 to v0.8.0. Port RFC-004 concept: ``` Registration = capability ("process.run action exists") @@ -3715,7 +3717,7 @@ IPC dispatch is synchronous. Startup is synchronous. File I/O assumes no concurr **The cascade (P6-1) is the symptom.** The root cause is that Core was designed for sequential execution and concurrency was added incrementally without revisiting the foundations. -**Resolution:** The Action/Task system (Section 18) is the fix. Actions execute with concurrency control. Tasks define parallel/sequential composition. The IPC bus stops being the execution engine — it becomes the notification channel. PERFORM replaces ACTION for request/response. Async is opt-in per Action, not per handler. +**Resolution:** The Action/Task system (Section 18) is implemented in core/go. `TaskDef` with `Steps` supports sequential chains, async dispatch, and previous-input piping. The cascade fix requires core/agent to wire its handlers as named Actions and replace the nested `c.ACTION()` calls with `c.Task("agent.completion").Run()`. See `core/agent/docs/plans/2026-03-25-core-go-v0.8.0-migration.md` Priority 5. ### Root Cause 4: No Recovery Path — 10 findings diff --git a/ipc.go b/ipc.go index f2b998f..249e770 100644 --- a/ipc.go +++ b/ipc.go @@ -21,11 +21,8 @@ type Ipc struct { queryMu sync.RWMutex queryHandlers []QueryHandler - taskMu sync.RWMutex - taskHandlers []TaskHandler - actions *Registry[*Action] // named action registry - tasks *Registry[*TaskDef] // named task registry + tasks *Registry[*Task] // named task registry } // broadcast dispatches a message to all registered IPC handlers. @@ -104,9 +101,3 @@ func (c *Core) RegisterActions(handlers ...func(*Core, Message) Result) { c.ipc.ipcMu.Unlock() } -// RegisterTask registers a handler for PERFORM task dispatch. -func (c *Core) RegisterTask(handler TaskHandler) { - c.ipc.taskMu.Lock() - c.ipc.taskHandlers = append(c.ipc.taskHandlers, handler) - c.ipc.taskMu.Unlock() -} diff --git a/ipc_test.go b/ipc_test.go index fcacb02..e0a4b20 100644 --- a/ipc_test.go +++ b/ipc_test.go @@ -1,6 +1,7 @@ package core_test import ( + "context" "testing" . "dappco.re/go/core" @@ -128,17 +129,14 @@ func TestIpc_QueryAll_Good(t *testing.T) { assert.Contains(t, results, "b") } -// --- IPC: Tasks --- +// --- IPC: Named Action Invocation --- -func TestIpc_Perform_Good(t *testing.T) { +func TestIpc_ActionInvoke_Good(t *testing.T) { c := New() - c.RegisterTask(func(_ *Core, t Task) Result { - if t == "compute" { - return Result{Value: 42, OK: true} - } - return Result{} + c.Action("compute", func(_ context.Context, opts Options) Result { + return Result{Value: 42, OK: true} }) - r := c.PERFORM("compute") + r := c.Action("compute").Run(context.Background(), NewOptions()) assert.True(t, r.OK) assert.Equal(t, 42, r.Value) } diff --git a/task.go b/task.go index 6d6eddf..b761f9d 100644 --- a/task.go +++ b/task.go @@ -1,77 +1,61 @@ // SPDX-License-Identifier: EUPL-1.2 -// Background task dispatch for the Core framework. +// Background action dispatch for the Core framework. +// PerformAsync runs a named Action in a background goroutine with +// panic recovery and progress broadcasting. package core -import ( - "reflect" - "slices" - "strconv" -) +import "context" -// TaskState holds background task state. -type TaskState struct { - Identifier string - Task Task - Result any - Error error -} - -// PerformAsync dispatches a task in a background goroutine. -func (c *Core) PerformAsync(t Task) Result { +// PerformAsync dispatches a named action in a background goroutine. +// Broadcasts ActionTaskStarted, ActionTaskProgress, and ActionTaskCompleted +// as IPC messages so other services can track progress. +// +// r := c.PerformAsync("agentic.dispatch", opts) +// taskID := r.Value.(string) +func (c *Core) PerformAsync(action string, opts Options) Result { if c.shutdown.Load() { return Result{} } - taskID := Concat("task-", strconv.FormatUint(c.taskIDCounter.Add(1), 10)) - if tid, ok := t.(TaskWithIdentifier); ok { - tid.SetTaskIdentifier(taskID) - } - c.ACTION(ActionTaskStarted{TaskIdentifier: taskID, Task: t}) + taskID := ID() + + c.ACTION(ActionTaskStarted{TaskIdentifier: taskID, Action: action, Options: opts}) + c.waitGroup.Go(func() { defer func() { if rec := recover(); rec != nil { - err := E("core.PerformAsync", Sprint("panic: ", rec), nil) - c.ACTION(ActionTaskCompleted{TaskIdentifier: taskID, Task: t, Result: nil, Error: err}) + c.ACTION(ActionTaskCompleted{ + TaskIdentifier: taskID, + Action: action, + Result: Result{E("core.PerformAsync", Sprint("panic: ", rec), nil), false}, + }) } }() - r := c.PERFORM(t) - var err error - if !r.OK { - if e, ok := r.Value.(error); ok { - err = e - } else { - taskType := reflect.TypeOf(t) - typeName := "" - if taskType != nil { - typeName = taskType.String() - } - err = E("core.PerformAsync", Join(" ", "no handler found for task type", typeName), nil) - } - } - c.ACTION(ActionTaskCompleted{TaskIdentifier: taskID, Task: t, Result: r.Value, Error: err}) + + r := c.Action(action).Run(context.Background(), opts) + + c.ACTION(ActionTaskCompleted{ + TaskIdentifier: taskID, + Action: action, + Result: r, + }) }) + return Result{taskID, true} } // Progress broadcasts a progress update for a background task. -func (c *Core) Progress(taskID string, progress float64, message string, t Task) { - c.ACTION(ActionTaskProgress{TaskIdentifier: taskID, Task: t, Progress: progress, Message: message}) +// +// c.Progress(taskID, 0.5, "halfway done", "agentic.dispatch") +func (c *Core) Progress(taskID string, progress float64, message string, action string) { + c.ACTION(ActionTaskProgress{ + TaskIdentifier: taskID, + Action: action, + Progress: progress, + Message: message, + }) } -func (c *Core) Perform(t Task) Result { - c.ipc.taskMu.RLock() - handlers := slices.Clone(c.ipc.taskHandlers) - c.ipc.taskMu.RUnlock() - - for _, h := range handlers { - r := h(c, t) - if r.OK { - return r - } - } - return Result{} -} - -// Registration methods (RegisterAction, RegisterActions, RegisterTask) +// Registration methods (RegisterAction, RegisterActions) // are in ipc.go — registration is IPC's responsibility. diff --git a/task_test.go b/task_test.go index 5c3eb4f..b23600e 100644 --- a/task_test.go +++ b/task_test.go @@ -17,17 +17,16 @@ func TestTask_PerformAsync_Good(t *testing.T) { var mu sync.Mutex var result string - c.RegisterTask(func(_ *Core, task Task) Result { + c.Action("work", func(_ context.Context, _ Options) Result { mu.Lock() result = "done" mu.Unlock() - return Result{"completed", true} + return Result{Value: "done", OK: true} }) - r := c.PerformAsync("work") + r := c.PerformAsync("work", NewOptions()) assert.True(t, r.OK) - taskID := r.Value.(string) - assert.NotEmpty(t, taskID) + assert.True(t, HasPrefix(r.Value.(string), "id-"), "should return task ID") time.Sleep(100 * time.Millisecond) @@ -36,24 +35,25 @@ func TestTask_PerformAsync_Good(t *testing.T) { mu.Unlock() } -func TestTask_PerformAsync_Progress_Good(t *testing.T) { +func TestTask_PerformAsync_Good_Progress(t *testing.T) { c := New() - c.RegisterTask(func(_ *Core, task Task) Result { + c.Action("tracked", func(_ context.Context, _ Options) Result { return Result{OK: true} }) - r := c.PerformAsync("work") + r := c.PerformAsync("tracked", NewOptions()) taskID := r.Value.(string) - c.Progress(taskID, 0.5, "halfway", "work") + c.Progress(taskID, 0.5, "halfway", "tracked") } -func TestTask_PerformAsync_Completion_Good(t *testing.T) { +func TestTask_PerformAsync_Good_Completion(t *testing.T) { c := New() completed := make(chan ActionTaskCompleted, 1) - c.RegisterTask(func(_ *Core, task Task) Result { - return Result{Value: "result", OK: true} + c.Action("completable", func(_ context.Context, _ Options) Result { + return Result{Value: "output", OK: true} }) + c.RegisterAction(func(_ *Core, msg Message) Result { if evt, ok := msg.(ActionTaskCompleted); ok { completed <- evt @@ -61,18 +61,18 @@ func TestTask_PerformAsync_Completion_Good(t *testing.T) { return Result{OK: true} }) - c.PerformAsync("work") + c.PerformAsync("completable", NewOptions()) select { case evt := <-completed: - assert.Nil(t, evt.Error) - assert.Equal(t, "result", evt.Result) + assert.True(t, evt.Result.OK) + assert.Equal(t, "output", evt.Result.Value) case <-time.After(2 * time.Second): t.Fatal("timed out waiting for completion") } } -func TestTask_PerformAsync_NoHandler_Good(t *testing.T) { +func TestTask_PerformAsync_Bad_ActionNotRegistered(t *testing.T) { c := New() completed := make(chan ActionTaskCompleted, 1) @@ -83,26 +83,28 @@ func TestTask_PerformAsync_NoHandler_Good(t *testing.T) { return Result{OK: true} }) - c.PerformAsync("unhandled") + c.PerformAsync("nonexistent", NewOptions()) select { case evt := <-completed: - assert.NotNil(t, evt.Error) + assert.False(t, evt.Result.OK, "unregistered action should fail") case <-time.After(2 * time.Second): t.Fatal("timed out") } } -func TestTask_PerformAsync_AfterShutdown_Bad(t *testing.T) { +func TestTask_PerformAsync_Bad_AfterShutdown(t *testing.T) { c := New() + c.Action("work", func(_ context.Context, _ Options) Result { return Result{OK: true} }) + c.ServiceStartup(context.Background(), nil) c.ServiceShutdown(context.Background()) - r := c.PerformAsync("should not run") + r := c.PerformAsync("work", NewOptions()) assert.False(t, r.OK) } -// --- RegisterAction + RegisterActions --- +// --- RegisterAction + RegisterActions (broadcast handlers) --- func TestTask_RegisterAction_Good(t *testing.T) { c := New()