fix(agentic): restore RFC completion pipeline

Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
Virgil 2026-03-31 06:44:56 +00:00
parent e1496c21d7
commit 7d62899b7e
6 changed files with 222 additions and 27 deletions

View file

@ -285,6 +285,9 @@ func (s *PrepSubsystem) handleIngest(ctx context.Context, options core.Options)
// result := c.Action("agentic.poke").Run(ctx, core.NewOptions())
func (s *PrepSubsystem) handlePoke(ctx context.Context, _ core.Options) core.Result {
if s.ServiceRuntime != nil && s.Core().Action("runner.poke").Exists() {
return s.Core().Action("runner.poke").Run(ctx, core.NewOptions())
}
s.Poke()
return core.Result{OK: true}
}

View file

@ -81,6 +81,22 @@ func TestActions_HandlePoke_Good(t *testing.T) {
assert.True(t, r.OK)
}
func TestActions_HandlePoke_Good_DelegatesToRunner(t *testing.T) {
called := false
c := core.New()
c.Action("runner.poke", func(_ context.Context, _ core.Options) core.Result {
called = true
return core.Result{OK: true}
})
s := NewPrep()
s.ServiceRuntime = core.NewServiceRuntime(c, AgentOptions{})
r := s.handlePoke(context.Background(), core.NewOptions())
require.True(t, r.OK)
assert.True(t, called)
}
func TestActions_HandleQA_Bad_NoWorkspace(t *testing.T) {
s := newPrepWithProcess()
r := s.handleQA(context.Background(), core.NewOptions())

View file

@ -332,12 +332,6 @@ func (s *PrepSubsystem) onAgentComplete(agent, workspaceDir, outputFile string,
s.stopIssueTracking(workspaceDir)
s.broadcastComplete(agent, workspaceDir, finalStatus)
if finalStatus == "completed" && s.ServiceRuntime != nil {
s.Core().PerformAsync("agentic.complete", core.NewOptions(
core.Option{Key: "workspace", Value: workspaceDir},
))
}
}
// pid, processID, outputFile, err := s.spawnAgent(agent, prompt, workspaceDir)

View file

@ -7,17 +7,36 @@ import (
core "dappco.re/go/core"
)
// RegisterHandlers(c, subsystem)
// c.ACTION(messages.AgentCompleted{Workspace: "core/go-io/task-5", Repo: "go-io", Status: "completed"})
func RegisterHandlers(c *core.Core, s *PrepSubsystem) {
if c == nil || s == nil {
return
}
c.RegisterActions(
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionQA(coreApp, msg)
},
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionAutoPR(coreApp, msg)
},
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionVerify(coreApp, msg)
},
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionIngest(coreApp, msg)
},
func(coreApp *core.Core, msg core.Message) core.Result {
return handleCompletionPoke(coreApp, msg)
},
)
}
// _ = prep.HandleIPCEvents(c, messages.AgentCompleted{Workspace: "core/go-io/task-5", Status: "completed"})
// _ = prep.HandleIPCEvents(c, messages.SpawnQueued{Workspace: "core/go-io/task-5", Agent: "codex", Task: "fix tests"})
func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Result {
switch ev := msg.(type) {
case messages.AgentCompleted:
if c.Config().Enabled("auto-ingest") {
if workspaceDir := resolveWorkspace(ev.Workspace); workspaceDir != "" {
s.ingestFindings(workspaceDir)
}
}
case messages.SpawnQueued:
workspaceDir := resolveWorkspace(ev.Workspace)
if workspaceDir == "" {
@ -48,6 +67,89 @@ func (s *PrepSubsystem) HandleIPCEvents(c *core.Core, msg core.Message) core.Res
return core.Result{OK: true}
}
func handleCompletionQA(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.AgentCompleted)
if !ok || ev.Status != "completed" {
return core.Result{OK: true}
}
workspaceDir := resolveWorkspace(ev.Workspace)
if workspaceDir == "" {
return core.Result{OK: true}
}
performAsyncIfRegistered(c, "agentic.qa", workspaceActionOptions(workspaceDir))
return core.Result{OK: true}
}
func handleCompletionAutoPR(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.QAResult)
if !ok || !ev.Passed {
return core.Result{OK: true}
}
workspaceDir := resolveWorkspace(ev.Workspace)
if workspaceDir == "" {
return core.Result{OK: true}
}
performAsyncIfRegistered(c, "agentic.auto-pr", workspaceActionOptions(workspaceDir))
return core.Result{OK: true}
}
func handleCompletionVerify(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.PRCreated)
if !ok {
return core.Result{OK: true}
}
workspaceDir := findWorkspaceByPR(ev.Repo, ev.Branch)
if workspaceDir == "" {
return core.Result{OK: true}
}
performAsyncIfRegistered(c, "agentic.verify", workspaceActionOptions(workspaceDir))
return core.Result{OK: true}
}
func handleCompletionIngest(c *core.Core, msg core.Message) core.Result {
ev, ok := msg.(messages.AgentCompleted)
if !ok || c == nil || !c.Config().Enabled("auto-ingest") {
return core.Result{OK: true}
}
workspaceDir := resolveWorkspace(ev.Workspace)
if workspaceDir == "" {
return core.Result{OK: true}
}
performAsyncIfRegistered(c, "agentic.ingest", workspaceActionOptions(workspaceDir))
return core.Result{OK: true}
}
func handleCompletionPoke(c *core.Core, msg core.Message) core.Result {
if _, ok := msg.(messages.AgentCompleted); !ok {
return core.Result{OK: true}
}
if performAsyncIfRegistered(c, "runner.poke", core.NewOptions()).OK {
return core.Result{OK: true}
}
performAsyncIfRegistered(c, "agentic.poke", core.NewOptions())
return core.Result{OK: true}
}
func workspaceActionOptions(workspaceDir string) core.Options {
return core.NewOptions(core.Option{Key: "workspace", Value: workspaceDir})
}
func performAsyncIfRegistered(c *core.Core, action string, options core.Options) core.Result {
if c == nil || !c.Action(action).Exists() {
return core.Result{}
}
return c.PerformAsync(action, options)
}
// spawnResult := prep.SpawnFromQueue("codex", prompt, workspaceDir)
// pid := spawnResult.Value.(int)
func (s *PrepSubsystem) SpawnFromQueue(agent, prompt, workspaceDir string) core.Result {

View file

@ -3,12 +3,15 @@
package agentic
import (
"context"
"sync"
"testing"
"time"
"dappco.re/go/agent/pkg/messages"
core "dappco.re/go/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// newCoreForHandlerTests creates a Core with PrepSubsystem registered via
@ -31,6 +34,7 @@ func newCoreForHandlerTests(t *testing.T) (*core.Core, *PrepSubsystem) {
s.ServiceRuntime = core.NewServiceRuntime(c, AgentOptions{})
// RegisterService auto-discovers HandleIPCEvents on PrepSubsystem
c.RegisterService("agentic", s)
RegisterHandlers(c, s)
return c, s
}
@ -46,25 +50,22 @@ func TestHandlers_HandleIPCEvents_Good(t *testing.T) {
}
func TestHandlers_PokeOnCompletion_Good(t *testing.T) {
_, s := newCoreForHandlerTests(t)
c, _ := newCoreForHandlerTests(t)
// Drain any existing poke
select {
case <-s.pokeCh:
default:
}
poked := make(chan struct{}, 1)
c.Action("runner.poke", func(_ context.Context, _ core.Options) core.Result {
select {
case poked <- struct{}{}:
default:
}
return core.Result{OK: true}
})
// HandleIPCEvents receives AgentCompleted → calls Poke
s.HandleIPCEvents(s.Core(), messages.AgentCompleted{
c.ACTION(messages.AgentCompleted{
Workspace: "ws-test", Repo: "go-io", Status: "completed",
})
select {
case <-s.pokeCh:
// poke received
default:
t.Log("poke signal may not have been received synchronously")
}
require.Eventually(t, func() bool { return len(poked) == 1 }, time.Second, 10*time.Millisecond)
}
func TestHandlers_IngestOnCompletion_Good(t *testing.T) {
@ -114,6 +115,84 @@ func TestHandlers_PokeQueue_Good(t *testing.T) {
// Should call drainQueue without panic
}
func TestHandlers_RegisterHandlers_Good_CompletionPipeline(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)
workspaceName := "core/go-io/task-5"
workspaceDir := core.JoinPath(root, "workspace", "core", "go-io", "task-5")
require.True(t, fs.EnsureDir(core.JoinPath(workspaceDir, "repo")).OK)
require.NoError(t, writeStatus(workspaceDir, &WorkspaceStatus{
Status: "completed",
Repo: "go-io",
Branch: "agent/fix-tests",
Agent: "codex",
}))
var mu sync.Mutex
called := make(map[string]bool)
mark := func(name string) {
mu.Lock()
called[name] = true
mu.Unlock()
}
seen := func(name string) bool {
mu.Lock()
defer mu.Unlock()
return called[name]
}
c := core.New()
c.Config().Enable("auto-ingest")
RegisterHandlers(c, &PrepSubsystem{})
c.Action("agentic.qa", func(_ context.Context, options core.Options) core.Result {
if options.String("workspace") == workspaceDir {
mark("qa")
}
c.ACTION(messages.QAResult{Workspace: workspaceName, Repo: "go-io", Passed: true})
return core.Result{OK: true}
})
c.Action("agentic.auto-pr", func(_ context.Context, options core.Options) core.Result {
if options.String("workspace") == workspaceDir {
mark("auto-pr")
}
c.ACTION(messages.PRCreated{
Repo: "go-io",
Branch: "agent/fix-tests",
PRURL: "https://forge.lthn.ai/core/go-io/pulls/12",
PRNum: 12,
})
return core.Result{OK: true}
})
c.Action("agentic.verify", func(_ context.Context, options core.Options) core.Result {
if options.String("workspace") == workspaceDir {
mark("verify")
}
return core.Result{OK: true}
})
c.Action("agentic.ingest", func(_ context.Context, options core.Options) core.Result {
if options.String("workspace") == workspaceDir {
mark("ingest")
}
return core.Result{OK: true}
})
c.Action("runner.poke", func(_ context.Context, _ core.Options) core.Result {
mark("poke")
return core.Result{OK: true}
})
c.ACTION(messages.AgentCompleted{
Workspace: workspaceName,
Repo: "go-io",
Status: "completed",
})
require.Eventually(t, func() bool {
return seen("qa") && seen("auto-pr") && seen("verify") && seen("ingest") && seen("poke")
}, time.Second, 10*time.Millisecond)
}
func TestHandlers_IngestDisabled_Bad(t *testing.T) {
root := t.TempDir()
t.Setenv("CORE_WORKSPACE", root)

View file

@ -30,6 +30,7 @@ func Register(c *core.Core) core.Result {
c.Config().Enable("auto-pr")
c.Config().Enable("auto-merge")
c.Config().Enable("auto-ingest")
RegisterHandlers(c, subsystem)
return core.Result{Value: subsystem, OK: true}
}