From 7d62899b7e41c30a123ac5bbe8dc5fb2520aa8b6 Mon Sep 17 00:00:00 2001 From: Virgil Date: Tue, 31 Mar 2026 06:44:56 +0000 Subject: [PATCH] fix(agentic): restore RFC completion pipeline Co-Authored-By: Virgil --- pkg/agentic/actions.go | 3 + pkg/agentic/actions_test.go | 16 +++++ pkg/agentic/dispatch.go | 6 -- pkg/agentic/handlers.go | 116 ++++++++++++++++++++++++++++++++--- pkg/agentic/handlers_test.go | 107 +++++++++++++++++++++++++++----- pkg/agentic/register.go | 1 + 6 files changed, 222 insertions(+), 27 deletions(-) diff --git a/pkg/agentic/actions.go b/pkg/agentic/actions.go index 56ce8f8..a1f2918 100644 --- a/pkg/agentic/actions.go +++ b/pkg/agentic/actions.go @@ -285,6 +285,9 @@ func (s *PrepSubsystem) handleIngest(ctx context.Context, options core.Options) // result := c.Action("agentic.poke").Run(ctx, core.NewOptions()) func (s *PrepSubsystem) handlePoke(ctx context.Context, _ core.Options) core.Result { + if s.ServiceRuntime != nil && s.Core().Action("runner.poke").Exists() { + return s.Core().Action("runner.poke").Run(ctx, core.NewOptions()) + } s.Poke() return core.Result{OK: true} } diff --git a/pkg/agentic/actions_test.go b/pkg/agentic/actions_test.go index 83d3777..b4743bc 100644 --- a/pkg/agentic/actions_test.go +++ b/pkg/agentic/actions_test.go @@ -81,6 +81,22 @@ func TestActions_HandlePoke_Good(t *testing.T) { assert.True(t, r.OK) } +func TestActions_HandlePoke_Good_DelegatesToRunner(t *testing.T) { + called := false + c := core.New() + c.Action("runner.poke", func(_ context.Context, _ core.Options) core.Result { + called = true + return core.Result{OK: true} + }) + + s := NewPrep() + s.ServiceRuntime = core.NewServiceRuntime(c, AgentOptions{}) + + r := s.handlePoke(context.Background(), core.NewOptions()) + require.True(t, r.OK) + assert.True(t, called) +} + func TestActions_HandleQA_Bad_NoWorkspace(t *testing.T) { s := newPrepWithProcess() r := s.handleQA(context.Background(), core.NewOptions()) diff --git a/pkg/agentic/dispatch.go b/pkg/agentic/dispatch.go index 0779033..ed9024d 100644 --- a/pkg/agentic/dispatch.go +++ b/pkg/agentic/dispatch.go @@ -332,12 +332,6 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string, s.stopIssueTracking(workspaceDir) s.broadcastComplete(agent, workspaceDir, finalStatus) - - if finalStatus == "completed" && s.ServiceRuntime != nil { - s.Core().PerformAsync("agentic.complete", core.NewOptions( - core.Option{Key: "workspace", Value: workspaceDir}, - )) - } } // pid, processID, outputFile, err := s.spawnAgent(agent, prompt, workspaceDir) diff --git a/pkg/agentic/handlers.go b/pkg/agentic/handlers.go index aeee5b7..00b0cc2 100644 --- a/pkg/agentic/handlers.go +++ b/pkg/agentic/handlers.go @@ -7,17 +7,36 @@ import ( core "dappco.re/go/core" ) +// RegisterHandlers(c, subsystem) +// c.ACTION(messages.AgentCompleted{Workspace: "core/go-io/task-5", Repo: "go-io", Status: "completed"}) +func RegisterHandlers(c *core.Core, s *PrepSubsystem) { + if c == nil || s == nil { + return + } + + c.RegisterActions( + func(coreApp *core.Core, msg core.Message) core.Result { + return handleCompletionQA(coreApp, msg) + }, + func(coreApp *core.Core, msg core.Message) core.Result { + return handleCompletionAutoPR(coreApp, msg) + }, + func(coreApp *core.Core, msg core.Message) core.Result { + return handleCompletionVerify(coreApp, msg) + }, + func(coreApp *core.Core, msg core.Message) core.Result { + return handleCompletionIngest(coreApp, msg) + }, + func(coreApp *core.Core, msg core.Message) core.Result { + return handleCompletionPoke(coreApp, msg) + }, + ) +} + // _ = prep.HandleIPCEvents(c, messages.AgentCompleted{Workspace: "core/go-io/task-5", Status: "completed"}) // _ = prep.HandleIPCEvents(c, messages.SpawnQueued{Workspace: "core/go-io/task-5", Agent: "codex", Task: "fix tests"}) func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Result { switch ev := msg.(type) { - case messages.AgentCompleted: - if c.Config().Enabled("auto-ingest") { - if workspaceDir := resolveWorkspace(ev.Workspace); workspaceDir != "" { - s.ingestFindings(workspaceDir) - } - } - case messages.SpawnQueued: workspaceDir := resolveWorkspace(ev.Workspace) if workspaceDir == "" { @@ -48,6 +67,89 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res return core.Result{OK: true} } +func handleCompletionQA(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.AgentCompleted) + if !ok || ev.Status != "completed" { + return core.Result{OK: true} + } + + workspaceDir := resolveWorkspace(ev.Workspace) + if workspaceDir == "" { + return core.Result{OK: true} + } + + performAsyncIfRegistered(c, "agentic.qa", workspaceActionOptions(workspaceDir)) + return core.Result{OK: true} +} + +func handleCompletionAutoPR(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.QAResult) + if !ok || !ev.Passed { + return core.Result{OK: true} + } + + workspaceDir := resolveWorkspace(ev.Workspace) + if workspaceDir == "" { + return core.Result{OK: true} + } + + performAsyncIfRegistered(c, "agentic.auto-pr", workspaceActionOptions(workspaceDir)) + return core.Result{OK: true} +} + +func handleCompletionVerify(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.PRCreated) + if !ok { + return core.Result{OK: true} + } + + workspaceDir := findWorkspaceByPR(ev.Repo, ev.Branch) + if workspaceDir == "" { + return core.Result{OK: true} + } + + performAsyncIfRegistered(c, "agentic.verify", workspaceActionOptions(workspaceDir)) + return core.Result{OK: true} +} + +func handleCompletionIngest(c *core.Core, msg core.Message) core.Result { + ev, ok := msg.(messages.AgentCompleted) + if !ok || c == nil || !c.Config().Enabled("auto-ingest") { + return core.Result{OK: true} + } + + workspaceDir := resolveWorkspace(ev.Workspace) + if workspaceDir == "" { + return core.Result{OK: true} + } + + performAsyncIfRegistered(c, "agentic.ingest", workspaceActionOptions(workspaceDir)) + return core.Result{OK: true} +} + +func handleCompletionPoke(c *core.Core, msg core.Message) core.Result { + if _, ok := msg.(messages.AgentCompleted); !ok { + return core.Result{OK: true} + } + + if performAsyncIfRegistered(c, "runner.poke", core.NewOptions()).OK { + return core.Result{OK: true} + } + performAsyncIfRegistered(c, "agentic.poke", core.NewOptions()) + return core.Result{OK: true} +} + +func workspaceActionOptions(workspaceDir string) core.Options { + return core.NewOptions(core.Option{Key: "workspace", Value: workspaceDir}) +} + +func performAsyncIfRegistered(c *core.Core, action string, options core.Options) core.Result { + if c == nil || !c.Action(action).Exists() { + return core.Result{} + } + return c.PerformAsync(action, options) +} + // spawnResult := prep.SpawnFromQueue("codex", prompt, workspaceDir) // pid := spawnResult.Value.(int) func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, workspaceDir string) core.Result { diff --git a/pkg/agentic/handlers_test.go b/pkg/agentic/handlers_test.go index e671e46..2aa85a3 100644 --- a/pkg/agentic/handlers_test.go +++ b/pkg/agentic/handlers_test.go @@ -3,12 +3,15 @@ package agentic import ( + "context" + "sync" "testing" "time" "dappco.re/go/agent/pkg/messages" core "dappco.re/go/core" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // newCoreForHandlerTests creates a Core with PrepSubsystem registered via @@ -31,6 +34,7 @@ func newCoreForHandlerTests(t *testing.T) (*core.Core, *PrepSubsystem) { s.ServiceRuntime = core.NewServiceRuntime(c, AgentOptions{}) // RegisterService auto-discovers HandleIPCEvents on PrepSubsystem c.RegisterService("agentic", s) + RegisterHandlers(c, s) return c, s } @@ -46,25 +50,22 @@ func TestHandlers_HandleIPCEvents_Good(t *testing.T) { } func TestHandlers_PokeOnCompletion_Good(t *testing.T) { - _, s := newCoreForHandlerTests(t) + c, _ := newCoreForHandlerTests(t) - // Drain any existing poke - select { - case <-s.pokeCh: - default: - } + poked := make(chan struct{}, 1) + c.Action("runner.poke", func(_ context.Context, _ core.Options) core.Result { + select { + case poked <- struct{}{}: + default: + } + return core.Result{OK: true} + }) - // HandleIPCEvents receives AgentCompleted → calls Poke - s.HandleIPCEvents(s.Core(), messages.AgentCompleted{ + c.ACTION(messages.AgentCompleted{ Workspace: "ws-test", Repo: "go-io", Status: "completed", }) - select { - case <-s.pokeCh: - // poke received - default: - t.Log("poke signal may not have been received synchronously") - } + require.Eventually(t, func() bool { return len(poked) == 1 }, time.Second, 10*time.Millisecond) } func TestHandlers_IngestOnCompletion_Good(t *testing.T) { @@ -114,6 +115,84 @@ func TestHandlers_PokeQueue_Good(t *testing.T) { // Should call drainQueue without panic } +func TestHandlers_RegisterHandlers_Good_CompletionPipeline(t *testing.T) { + root := t.TempDir() + t.Setenv("CORE_WORKSPACE", root) + + workspaceName := "core/go-io/task-5" + workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5") + require.True(t, fs.EnsureDir(core.JoinPath(workspaceDir, "repo")).OK) + require.NoError(t, writeStatus(workspaceDir, &WorkspaceStatus{ + Status: "completed", + Repo: "go-io", + Branch: "agent/fix-tests", + Agent: "codex", + })) + + var mu sync.Mutex + called := make(map[string]bool) + mark := func(name string) { + mu.Lock() + called[name] = true + mu.Unlock() + } + seen := func(name string) bool { + mu.Lock() + defer mu.Unlock() + return called[name] + } + + c := core.New() + c.Config().Enable("auto-ingest") + RegisterHandlers(c, &PrepSubsystem{}) + + c.Action("agentic.qa", func(_ context.Context, options core.Options) core.Result { + if options.String("workspace") == workspaceDir { + mark("qa") + } + c.ACTION(messages.QAResult{Workspace: workspaceName, Repo: "go-io", Passed: true}) + return core.Result{OK: true} + }) + c.Action("agentic.auto-pr", func(_ context.Context, options core.Options) core.Result { + if options.String("workspace") == workspaceDir { + mark("auto-pr") + } + c.ACTION(messages.PRCreated{ + Repo: "go-io", + Branch: "agent/fix-tests", + PRURL: "https://forge.lthn.ai/core/go-io/pulls/12", + PRNum: 12, + }) + return core.Result{OK: true} + }) + c.Action("agentic.verify", func(_ context.Context, options core.Options) core.Result { + if options.String("workspace") == workspaceDir { + mark("verify") + } + return core.Result{OK: true} + }) + c.Action("agentic.ingest", func(_ context.Context, options core.Options) core.Result { + if options.String("workspace") == workspaceDir { + mark("ingest") + } + return core.Result{OK: true} + }) + c.Action("runner.poke", func(_ context.Context, _ core.Options) core.Result { + mark("poke") + return core.Result{OK: true} + }) + + c.ACTION(messages.AgentCompleted{ + Workspace: workspaceName, + Repo: "go-io", + Status: "completed", + }) + + require.Eventually(t, func() bool { + return seen("qa") && seen("auto-pr") && seen("verify") && seen("ingest") && seen("poke") + }, time.Second, 10*time.Millisecond) +} + func TestHandlers_IngestDisabled_Bad(t *testing.T) { root := t.TempDir() t.Setenv("CORE_WORKSPACE", root) diff --git a/pkg/agentic/register.go b/pkg/agentic/register.go index 051762c..5d5f679 100644 --- a/pkg/agentic/register.go +++ b/pkg/agentic/register.go @@ -30,6 +30,7 @@ func Register(c *core.Core) core.Result { c.Config().Enable("auto-pr") c.Config().Enable("auto-merge") c.Config().Enable("auto-ingest") + RegisterHandlers(c, subsystem) return core.Result{Value: subsystem, OK: true} }